- Update Readme

- Add funding.yml
- Add Arr Queue cleanner worker
- Rewrote worker
This commit is contained in:
Mukhtar Akere
2025-02-19 23:52:53 +01:00
parent 9a7bff04ef
commit 108da305b3
21 changed files with 340 additions and 69 deletions

2
.github/FUNDING.yml vendored Normal file
View File

@@ -0,0 +1,2 @@
github: sirrobot01
buy_me_a_coffee: sirrobot01

View File

@@ -127,4 +127,12 @@
- Qbittorrent
- Add support for tags(creating, deleting, listing)
- Add support for categories(creating, deleting, listing)
- Fix issues with arr sending torrents using a different content type.
- Fix issues with arr sending torrents using a different content type.
#### 0.4.1
- Adds optional UI authentication
- Downloaded Torrents persist on restart
- Fixes
- Fix Alldebrid struggling to find the correct file
- Minor bug fixes or speed-gains

View File

@@ -1,8 +1,8 @@
### DecyphArr(with Debrid Proxy Support)
### DecyphArr(Qbittorent, but with Debrid Proxy Support)
![ui](doc/main.png)
This is a Golang implementation of Torrent QbitTorrent with a **Multiple Debrid service support**.
This is an implementation of QbitTorrent with a **Multiple Debrid service support**. Written in Go.
### Table of Contents
@@ -37,7 +37,7 @@ This is a Golang implementation of Torrent QbitTorrent with a **Multiple Debrid
- Multi-Debrid Providers support
- Repair Worker for missing files (**NEW**)
The proxy is useful in filtering out un-cached Real Debrid torrents
The proxy is useful for filtering out un-cached Debrid torrents
### Supported Debrid Providers
- [Real Debrid](https://real-debrid.com)
@@ -99,8 +99,8 @@ Download the binary from the releases page and run it with the config file.
- Category: e.g `sonarr`, `radarr`
- Use SSL -> `No`
- Sequential Download -> `No`|`Yes` (If you want to download the torrents locally instead of symlink)
- Test
- Save
- Click Test
- Click Save
#### Basic Sample Config
@@ -187,6 +187,7 @@ This is particularly useful if you want to use the Repair tool without using Qbi
- The `name` key is the name of the Arr/ Category
- The `host` key is the host of the Arr
- The `token` key is the API token of the Arr
- THe `cleanup` key is used to cleanup your arr queues. This is usually for removing dangling queues(downloads that all the files have been import, sometimes, some incomplete season packs)
</details>

View File

@@ -10,6 +10,7 @@ import (
"github.com/sirrobot01/debrid-blackhole/pkg/service"
"github.com/sirrobot01/debrid-blackhole/pkg/version"
"github.com/sirrobot01/debrid-blackhole/pkg/web"
"github.com/sirrobot01/debrid-blackhole/pkg/worker"
"log"
"sync"
)
@@ -57,7 +58,9 @@ func Start(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
_qbit.StartWorker(ctx)
if err := worker.Start(ctx); err != nil {
errChan <- err
}
}()
if cfg.Repair.Enabled {

View File

@@ -57,12 +57,14 @@
{
"name": "sonarr",
"host": "http://host:8989",
"token": "arr_key"
"token": "arr_key",
"cleanup": false
},
{
"name": "radarr",
"host": "http://host:7878",
"token": "arr_key"
"token": "arr_key",
"cleanup": false
}
],
"repair": {
@@ -76,5 +78,5 @@
"min_file_size": "",
"max_file_size": "",
"allowed_file_types": [],
"use_auth": false
"use_auth": false,
}

View File

@@ -1,4 +1,4 @@
package common
package cache
import (
"sync"
@@ -11,7 +11,7 @@ type Cache struct {
mu sync.RWMutex
}
func NewCache(maxItems int) *Cache {
func New(maxItems int) *Cache {
if maxItems <= 0 {
maxItems = 1000
}

View File

@@ -45,9 +45,10 @@ type QBitTorrent struct {
}
type Arr struct {
Name string `json:"name"`
Host string `json:"host"`
Token string `json:"token"`
Name string `json:"name"`
Host string `json:"host"`
Token string `json:"token"`
Cleanup bool `json:"cleanup"`
}
type Repair struct {

View File

@@ -25,19 +25,20 @@ var (
)
type Arr struct {
Name string `json:"name"`
Host string `json:"host"`
Token string `json:"token"`
Type Type `json:"type"`
verifiedDirs sync.Map // map[string]struct{} -> dir -> struct{}
Name string `json:"name"`
Host string `json:"host"`
Token string `json:"token"`
Type Type `json:"type"`
Cleanup bool `json:"cleanup"`
}
func NewArr(name, host, token string, arrType Type) *Arr {
func New(name, host, token string, cleanup bool) *Arr {
return &Arr{
Name: name,
Host: host,
Token: token,
Type: arrType,
Name: name,
Host: host,
Token: token,
Type: InferType(host, name),
Cleanup: cleanup,
}
}
@@ -71,7 +72,7 @@ type Storage struct {
mu sync.RWMutex
}
func inferType(host, name string) Type {
func InferType(host, name string) Type {
switch {
case strings.Contains(host, "sonarr") || strings.Contains(name, "sonarr"):
return Sonarr
@@ -90,7 +91,7 @@ func NewStorage() *Storage {
arrs := make(map[string]*Arr)
for _, a := range config.GetConfig().Arrs {
name := a.Name
arrs[name] = NewArr(name, a.Host, a.Token, inferType(a.Host, name))
arrs[name] = New(name, a.Host, a.Token, a.Cleanup)
}
return &Storage{
Arrs: arrs,

View File

@@ -112,14 +112,8 @@ func GetMovies(a *Arr, tvId string) ([]Content, error) {
return contents, nil
}
func (a *Arr) SearchMissing(files []ContentFile) error {
func (a *Arr) search(ids []int) error {
var payload interface{}
ids := make([]int, 0)
for _, f := range files {
ids = append(ids, f.Id)
}
switch a.Type {
case Sonarr:
payload = struct {
@@ -143,14 +137,27 @@ func (a *Arr) SearchMissing(files []ContentFile) error {
resp, err := a.Request(http.MethodPost, "api/v3/command", payload)
if err != nil {
return fmt.Errorf("failed to search missing: %v", err)
return fmt.Errorf("failed to automatic search: %v", err)
}
if statusOk := strconv.Itoa(resp.StatusCode)[0] == '2'; !statusOk {
return fmt.Errorf("failed to search missing. Status Code: %s", resp.Status)
return fmt.Errorf("failed to automatic search. Status Code: %s", resp.Status)
}
return nil
}
func (a *Arr) SearchMissing(files []ContentFile) error {
ids := make([]int, 0)
for _, f := range files {
ids = append(ids, f.Id)
}
if len(ids) == 0 {
return nil
}
return a.search(ids)
}
func (a *Arr) DeleteFiles(files []ContentFile) error {
ids := make([]int, 0)
for _, f := range files {

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"net/http"
gourl "net/url"
"strings"
)
type HistorySchema struct {
@@ -18,6 +19,37 @@ type HistorySchema struct {
} `json:"records"`
}
type QueueResponseScheme struct {
Page int `json:"page"`
PageSize int `json:"pageSize"`
SortKey string `json:"sortKey"`
SortDirection string `json:"sortDirection"`
TotalRecords int `json:"totalRecords"`
Records []QueueSchema `json:"records"`
}
type QueueSchema struct {
SeriesId int `json:"seriesId"`
EpisodeId int `json:"episodeId"`
SeasonNumber int `json:"seasonNumber"`
Title string `json:"title"`
Status string `json:"status"`
TrackedDownloadStatus string `json:"trackedDownloadStatus"`
TrackedDownloadState string `json:"trackedDownloadState"`
StatusMessages []struct {
Title string `json:"title"`
Messages []string `json:"messages"`
} `json:"statusMessages"`
DownloadId string `json:"downloadId"`
Protocol string `json:"protocol"`
DownloadClient string `json:"downloadClient"`
DownloadClientHasPostImportCategory bool `json:"downloadClientHasPostImportCategory"`
Indexer string `json:"indexer"`
OutputPath string `json:"outputPath"`
EpisodeHasFile bool `json:"episodeHasFile"`
Id int `json:"id"`
}
func (a *Arr) GetHistory(downloadId, eventType string) *HistorySchema {
query := gourl.Values{}
if downloadId != "" {
@@ -25,7 +57,7 @@ func (a *Arr) GetHistory(downloadId, eventType string) *HistorySchema {
}
query.Add("eventType", eventType)
query.Add("pageSize", "100")
url := "history" + "?" + query.Encode()
url := "api/v3/history" + "?" + query.Encode()
resp, err := a.Request(http.MethodGet, url, nil)
if err != nil {
return nil
@@ -39,3 +71,98 @@ func (a *Arr) GetHistory(downloadId, eventType string) *HistorySchema {
return data
}
func (a *Arr) GetQueue() []QueueSchema {
query := gourl.Values{}
query.Add("page", "1")
query.Add("pageSize", "200")
results := make([]QueueSchema, 0)
for {
url := "api/v3/queue" + "?" + query.Encode()
resp, err := a.Request(http.MethodGet, url, nil)
if err != nil {
break
}
defer resp.Body.Close()
var data QueueResponseScheme
if err = json.NewDecoder(resp.Body).Decode(&data); err != nil {
break
}
if len(results) < data.TotalRecords {
results = append(results, data.Records...)
query.Set("page", string(rune(data.Page+1)))
} else {
break
}
}
return results
}
func (a *Arr) CleanupQueue() error {
queue := a.GetQueue()
type messedUp struct {
id int
episodeId int
seasonNum int
}
cleanups := make(map[int][]messedUp)
for _, q := range queue {
isMessedUp := false
if q.Protocol == "torrent" && q.Status == "completed" && q.TrackedDownloadStatus == "warning" && q.TrackedDownloadState == "importPending" {
messages := q.StatusMessages
if len(messages) > 0 {
for _, m := range messages {
if strings.Contains(strings.Join(m.Messages, " "), "No files found are eligible for import in") {
isMessedUp = true
break
}
}
}
}
if isMessedUp {
cleanups[q.SeriesId] = append(cleanups[q.SeriesId], messedUp{
id: q.Id,
episodeId: q.EpisodeId,
seasonNum: q.SeasonNumber,
})
}
}
if len(cleanups) == 0 {
return nil
}
queueIds := make([]int, 0)
episodesIds := make([]int, 0)
for _, c := range cleanups {
// Delete the messed up episodes from queue
for _, m := range c {
queueIds = append(queueIds, m.id)
episodesIds = append(episodesIds, m.episodeId)
}
}
// Delete the messed up episodes from queue
payload := struct {
Ids []int `json:"ids"`
}{
Ids: queueIds,
}
// Blocklist that hash(it's typically not complete, then research the episode)
query := gourl.Values{}
query.Add("removeFromClient", "true")
query.Add("blocklist", "true")
query.Add("skipRedownload", "false")
query.Add("changeCategory", "false")
url := "api/v3/queue/bulk" + "?" + query.Encode()
_, err := a.Request(http.MethodDelete, url, payload)
if err != nil {
return err
}
return nil
}

View File

@@ -22,7 +22,7 @@ func (a *Arr) Refresh() error {
return fmt.Errorf("failed to refresh monitored downloads for %s", cmp.Or(a.Name, a.Host))
}
func (a *Arr) MarkAsFailed(infoHash string) error {
func (a *Arr) Blacklist(infoHash string) error {
downloadId := strings.ToUpper(infoHash)
history := a.GetHistory(downloadId, "grabbed")
if history == nil {

View File

@@ -4,7 +4,7 @@ import (
"encoding/json"
"fmt"
"github.com/rs/zerolog"
"github.com/sirrobot01/debrid-blackhole/common"
"github.com/sirrobot01/debrid-blackhole/internal/cache"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/internal/request"
@@ -24,7 +24,7 @@ type AllDebrid struct {
APIKey string
DownloadUncached bool
client *request.RLHTTPClient
cache *common.Cache
cache *cache.Cache
MountPath string
logger zerolog.Logger
CheckCached bool
@@ -278,7 +278,7 @@ func (ad *AllDebrid) GetTorrents() ([]*torrent.Torrent, error) {
return nil, fmt.Errorf("not implemented")
}
func New(dc config.Debrid, cache *common.Cache) *AllDebrid {
func New(dc config.Debrid, cache *cache.Cache) *AllDebrid {
rl := request.ParseRateLimit(dc.RateLimit)
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),

View File

@@ -3,7 +3,7 @@ package debrid
import (
"cmp"
"fmt"
"github.com/sirrobot01/debrid-blackhole/common"
"github.com/sirrobot01/debrid-blackhole/internal/cache"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/arr"
@@ -23,7 +23,7 @@ func New() *engine.Engine {
maxCacheSize := maxCachedSize / len(cfg.Debrids)
for _, dc := range cfg.Debrids {
d := createDebrid(dc, common.NewCache(maxCacheSize))
d := createDebrid(dc, cache.New(maxCacheSize))
logger := d.GetLogger()
logger.Info().Msg("Debrid Service started")
debrids = append(debrids, d)
@@ -32,7 +32,7 @@ func New() *engine.Engine {
return d
}
func createDebrid(dc config.Debrid, cache *common.Cache) engine.Service {
func createDebrid(dc config.Debrid, cache *cache.Cache) engine.Service {
switch dc.Name {
case "realdebrid":
return realdebrid.New(dc, cache)

View File

@@ -5,7 +5,7 @@ import (
"encoding/json"
"fmt"
"github.com/rs/zerolog"
"github.com/sirrobot01/debrid-blackhole/common"
"github.com/sirrobot01/debrid-blackhole/internal/cache"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/internal/request"
@@ -23,7 +23,7 @@ type DebridLink struct {
APIKey string
DownloadUncached bool
client *request.RLHTTPClient
cache *common.Cache
cache *cache.Cache
MountPath string
logger zerolog.Logger
CheckCached bool
@@ -268,7 +268,7 @@ func (dl *DebridLink) GetCheckCached() bool {
return dl.CheckCached
}
func New(dc config.Debrid, cache *common.Cache) *DebridLink {
func New(dc config.Debrid, cache *cache.Cache) *DebridLink {
rl := request.ParseRateLimit(dc.RateLimit)
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),

View File

@@ -4,7 +4,7 @@ import (
"encoding/json"
"fmt"
"github.com/rs/zerolog"
"github.com/sirrobot01/debrid-blackhole/common"
"github.com/sirrobot01/debrid-blackhole/internal/cache"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/internal/request"
@@ -25,7 +25,7 @@ type RealDebrid struct {
APIKey string
DownloadUncached bool
client *request.RLHTTPClient
cache *common.Cache
cache *cache.Cache
MountPath string
logger zerolog.Logger
CheckCached bool
@@ -380,7 +380,7 @@ func (r *RealDebrid) GetTorrents() ([]*torrent.Torrent, error) {
}
func New(dc config.Debrid, cache *common.Cache) *RealDebrid {
func New(dc config.Debrid, cache *cache.Cache) *RealDebrid {
rl := request.ParseRateLimit(dc.RateLimit)
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),

View File

@@ -5,7 +5,7 @@ import (
"encoding/json"
"fmt"
"github.com/rs/zerolog"
"github.com/sirrobot01/debrid-blackhole/common"
"github.com/sirrobot01/debrid-blackhole/internal/cache"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/internal/request"
@@ -29,7 +29,7 @@ type Torbox struct {
APIKey string
DownloadUncached bool
client *request.RLHTTPClient
cache *common.Cache
cache *cache.Cache
MountPath string
logger zerolog.Logger
CheckCached bool
@@ -331,7 +331,7 @@ func (tb *Torbox) GetTorrents() ([]*torrent.Torrent, error) {
return nil, fmt.Errorf("not implemented")
}
func New(dc config.Debrid, cache *common.Cache) *Torbox {
func New(dc config.Debrid, cache *cache.Cache) *Torbox {
rl := request.ParseRateLimit(dc.RateLimit)
headers := map[string]string{
"Authorization": fmt.Sprintf("Bearer %s", dc.APIKey),

View File

@@ -2,7 +2,7 @@ package torrent
import (
"fmt"
"github.com/sirrobot01/debrid-blackhole/common"
"github.com/sirrobot01/debrid-blackhole/internal/cache"
"github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/arr"
"os"
@@ -107,7 +107,7 @@ func (t *Torrent) GetFile(id string) *File {
return nil
}
func GetLocalCache(infohashes []string, cache *common.Cache) ([]string, map[string]bool) {
func GetLocalCache(infohashes []string, cache *cache.Cache) ([]string, map[string]bool) {
result := make(map[string]bool)
hashes := make([]string, 0)

View File

@@ -56,14 +56,17 @@ func (q *QBit) authContext(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
host, token, err := decodeAuthHeader(r.Header.Get("Authorization"))
category := r.Context().Value("category").(string)
a := &arr.Arr{
Name: category,
svc := service.GetService()
// Check if arr exists
a := svc.Arr.Get(category)
if a == nil {
a = arr.New(category, "", "", false)
}
if err == nil {
a.Host = strings.TrimSpace(host)
a.Token = strings.TrimSpace(token)
}
svc := service.GetService()
svc.Arr.AddOrUpdate(a)
ctx := context.WithValue(r.Context(), "arr", a)
next.ServeHTTP(w, r.WithContext(ctx))

View File

@@ -309,7 +309,7 @@ func (ui *Handler) handleAddContent(w http.ResponseWriter, r *http.Request) {
_arr := svc.Arr.Get(arrName)
if _arr == nil {
_arr = arr.NewArr(arrName, "", "", arr.Sonarr)
_arr = arr.New(arrName, "", "", false)
}
// Handle URLs

View File

@@ -4,12 +4,8 @@ import (
"context"
"fmt"
"github.com/go-chi/chi/v5"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/pkg/service"
"html/template"
"net/http"
"os"
"sync"
)
@@ -18,15 +14,15 @@ type WebDav struct {
}
func New() *WebDav {
svc := service.GetService()
cfg := config.GetConfig()
//svc := service.GetService()
//cfg := config.GetConfig()
w := &WebDav{
Handlers: make([]*Handler, 0),
}
for name, c := range svc.DebridCache.GetCaches() {
h := NewHandler(name, c, logger.NewLogger(fmt.Sprintf("%s-webdav", name), cfg.LogLevel, os.Stdout))
w.Handlers = append(w.Handlers, h)
}
//for name, c := range svc.DebridCache.GetCaches() {
// h := NewHandler(name, c, logger.NewLogger(fmt.Sprintf("%s-webdav", name), cfg.LogLevel, os.Stdout))
// w.Handlers = append(w.Handlers, h)
//}
return w
}

120
pkg/worker/worker.go Normal file
View File

@@ -0,0 +1,120 @@
package worker
import (
"context"
"github.com/rs/zerolog"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/pkg/service"
"os"
"sync"
"time"
)
var (
_logInstance zerolog.Logger
once sync.Once
)
func getLogger() zerolog.Logger {
once.Do(func() {
cfg := config.GetConfig()
_logInstance = logger.NewLogger("worker", cfg.LogLevel, os.Stdout)
})
return _logInstance
}
func Start(ctx context.Context) error {
cfg := config.GetConfig()
// Start Arr Refresh Worker
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
arrRefreshWorker(ctx, cfg)
}()
wg.Add(1)
go func() {
defer wg.Done()
cleanUpQueuesWorker(ctx, cfg)
}()
wg.Wait()
return nil
}
func arrRefreshWorker(ctx context.Context, cfg *config.Config) {
// Start Arr Refresh Worker
_logger := getLogger()
_logger.Debug().Msg("Refresh Worker started")
refreshCtx := context.WithValue(ctx, "worker", "refresh")
refreshTicker := time.NewTicker(time.Duration(cfg.QBitTorrent.RefreshInterval) * time.Second)
var refreshMutex sync.Mutex
for {
select {
case <-refreshCtx.Done():
_logger.Debug().Msg("Refresh Worker stopped")
return
case <-refreshTicker.C:
if refreshMutex.TryLock() {
go func() {
defer refreshMutex.Unlock()
refreshArrs()
}()
} else {
_logger.Debug().Msg("Previous refresh still running, skipping this cycle")
}
}
}
}
func cleanUpQueuesWorker(ctx context.Context, cfg *config.Config) {
// Start Clean up Queues Worker
_logger := getLogger()
_logger.Debug().Msg("Clean up Queues Worker started")
cleanupCtx := context.WithValue(ctx, "worker", "cleanup")
cleanupTicker := time.NewTicker(time.Duration(10) * time.Second)
var cleanupMutex sync.Mutex
for {
select {
case <-cleanupCtx.Done():
_logger.Debug().Msg("Clean up Queues Worker stopped")
return
case <-cleanupTicker.C:
if cleanupMutex.TryLock() {
go func() {
defer cleanupMutex.Unlock()
cleanUpQueues()
}()
}
}
}
}
func refreshArrs() {
arrs := service.GetService().Arr
for _, arr := range arrs.GetAll() {
err := arr.Refresh()
if err != nil {
return
}
}
}
func cleanUpQueues() {
// Clean up queues
_logger := getLogger()
_logger.Debug().Msg("Cleaning up queues")
arrs := service.GetService().Arr
for _, arr := range arrs.GetAll() {
if err := arr.CleanupQueue(); err != nil {
_logger.Debug().Err(err).Msg("Error cleaning up queue")
}
}
}