12 Commits

Author SHA1 Message Date
Mukhtar Akere
1c06407900 Hotfixes
Some checks failed
GoReleaser / goreleaser (push) Has been cancelled
Release Docker Build / docker (push) Has been cancelled
2025-03-02 14:33:58 +01:00
Mukhtar Akere
b1a3d8b762 Minor bug fixes 2025-02-28 21:25:47 +01:00
Mukhtar Akere
0e25de0e3c Hotfix 2025-02-28 20:48:30 +01:00
Mukhtar Akere
e741a0e32b Hotfix 2025-02-28 20:21:45 +01:00
Mukhtar Akere
84bd93805f try to fix memory hogging 2025-02-28 16:05:04 +01:00
Mukhtar Akere
fce2ce28c7 Finalize workflow 2025-02-28 04:06:51 +01:00
Mukhtar Akere
302a461efd Update workflows 2025-02-28 04:03:13 +01:00
Mukhtar Akere
7eb021aac1 - Add ghcr
- Add checks for arr url and token
2025-02-28 03:57:26 +01:00
Mukhtar Akere
7a989ccf2b hotfix v0.4.2 2025-02-28 03:33:11 +01:00
Mukhtar Akere
f04d7ac86e hotfixes 2025-02-28 03:10:14 +01:00
Mukhtar Akere
65fb2d1e7c revamp deployment 2025-02-28 00:54:11 +01:00
Mukhtar Akere
46beac7227 Changelog 0.4.2 2025-02-28 00:38:31 +01:00
34 changed files with 411 additions and 566 deletions

View File

@@ -5,7 +5,7 @@ tmp_dir = "tmp"
[build]
args_bin = ["--config", "data/"]
bin = "./tmp/main"
cmd = "bash -c 'go build -ldflags \"-X github.com/sirrobot01/debrid-blackhole/pkg/version.Version=0.0.4 -X github.com/sirrobot01/debrid-blackhole/pkg/version.Channel=beta\" -o ./tmp/main .'"
cmd = "bash -c 'go build -ldflags \"-X github.com/sirrobot01/debrid-blackhole/pkg/version.Version=0.0.0 -X github.com/sirrobot01/debrid-blackhole/pkg/version.Channel=beta\" -o ./tmp/main .'"
delay = 1000
exclude_dir = ["assets", "tmp", "vendor", "testdata", "data"]
exclude_file = []

85
.github/workflows/beta-docker.yml vendored Normal file
View File

@@ -0,0 +1,85 @@
name: Beta Docker Build
on:
push:
branches:
- beta
permissions:
contents: read
packages: write
jobs:
docker:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Calculate beta version
id: calculate_version
run: |
LATEST_TAG=$(git tag | grep -v 'beta' | sort -V | tail -n1)
echo "Found latest tag: ${LATEST_TAG}"
IFS='.' read -r -a VERSION_PARTS <<< "$LATEST_TAG"
MAJOR="${VERSION_PARTS[0]}"
MINOR="${VERSION_PARTS[1]}"
PATCH="${VERSION_PARTS[2]}"
NEW_PATCH=$((PATCH + 1))
BETA_VERSION="${MAJOR}.${MINOR}.${NEW_PATCH}"
echo "Calculated beta version: ${BETA_VERSION}"
echo "beta_version=${BETA_VERSION}" >> $GITHUB_ENV
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Cache Docker layers
uses: actions/cache@v3
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: |
${{ runner.os }}-buildx-
# Login to Docker Hub
- name: Login to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
# Login to GitHub Container Registry
- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push beta Docker image
uses: docker/build-push-action@v5
with:
context: .
platforms: linux/amd64,linux/arm64,linux/arm/v7
push: true
tags: |
cy01/blackhole:beta
ghcr.io/${{ github.repository_owner }}/decypharr:beta
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache-new,mode=max
build-args: |
VERSION=${{ env.beta_version }}
CHANNEL=beta
- name: Move cache
run: |
rm -rf /tmp/.buildx-cache
mv /tmp/.buildx-cache-new /tmp/.buildx-cache

View File

@@ -1,69 +0,0 @@
name: Docker Build and Push
on:
push:
branches:
- main
- beta
jobs:
docker:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Get version
id: get_version
run: |
LATEST_TAG=$(git tag | sort -V | tail -n1)
echo "latest_tag=${LATEST_TAG}" >> $GITHUB_ENV
- name: Set channel
id: set_channel
run: |
if [[ ${{ github.ref }} == 'refs/heads/beta' ]]; then
echo "CHANNEL=beta" >> $GITHUB_ENV
else
echo "CHANNEL=stable" >> $GITHUB_ENV
fi
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Build and push for beta branch
if: github.ref == 'refs/heads/beta'
uses: docker/build-push-action@v5
with:
context: .
platforms: linux/amd64,linux/arm64,linux/arm/v7
push: true
tags: cy01/blackhole:beta
build-args: |
VERSION=${{ env.latest_tag }}
CHANNEL=${{ env.CHANNEL }}
- name: Build and push for main branch
if: github.ref == 'refs/heads/main'
uses: docker/build-push-action@v5
with:
context: .
platforms: linux/amd64,linux/arm64,linux/arm/v7
push: true
tags: |
cy01/blackhole:latest
cy01/blackhole:${{ env.latest_tag }}
build-args: |
VERSION=${{ env.latest_tag }}
CHANNEL=${{ env.CHANNEL }}

View File

@@ -1,4 +1,4 @@
name: Release
name: GoReleaser
on:
push:
@@ -16,20 +16,12 @@ jobs:
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.22'
- name: Set Release Channel
run: |
if [[ ${{ github.ref }} == refs/tags/beta* ]]; then
echo "RELEASE_CHANNEL=beta" >> $GITHUB_ENV
else
echo "RELEASE_CHANNEL=stable" >> $GITHUB_ENV
fi
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v5
with:
@@ -37,4 +29,5 @@ jobs:
version: latest
args: release --clean
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
RELEASE_CHANNEL: stable

77
.github/workflows/release-docker.yml vendored Normal file
View File

@@ -0,0 +1,77 @@
name: Release Docker Build
on:
push:
tags:
- '*'
permissions:
contents: read
packages: write
jobs:
docker:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Get tag name
id: get_tag
run: |
TAG_NAME=${GITHUB_REF#refs/tags/}
echo "tag_name=${TAG_NAME}" >> $GITHUB_ENV
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Cache Docker layers
uses: actions/cache@v3
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: |
${{ runner.os }}-buildx-
# Login to Docker Hub
- name: Login to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
# Login to GitHub Container Registry
- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push release Docker image
uses: docker/build-push-action@v5
with:
context: .
platforms: linux/amd64,linux/arm64,linux/arm/v7
push: true
tags: |
cy01/blackhole:latest
cy01/blackhole:${{ env.tag_name }}
ghcr.io/${{ github.repository_owner }}/decypharr:latest
ghcr.io/${{ github.repository_owner }}/decypharr:${{ env.tag_name }}
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache-new,mode=max
build-args: |
VERSION=${{ env.tag_name }}
CHANNEL=stable
- name: Move cache
run: |
rm -rf /tmp/.buildx-cache
mv /tmp/.buildx-cache-new /tmp/.buildx-cache

View File

@@ -136,4 +136,12 @@
- Fixes
- Fix Alldebrid struggling to find the correct file
- Minor bug fixes or speed-gains
- A new cleanup worker to clean up ARR queues
- A new cleanup worker to clean up ARR queues
#### 0.4.2
- Hotfixes
- Fix saving torrents error
- Fix bugs with the UI
- Speed improvements

View File

@@ -1,4 +1,4 @@
### DecyphArr(Qbittorent, but with Debrid Proxy Support)
### DecyphArr(Qbittorent, but with Debrid Support)
![ui](doc/main.png)
@@ -35,7 +35,7 @@ This is an implementation of QbitTorrent with a **Multiple Debrid service suppor
- Torbox Support
- Debrid Link Support
- Multi-Debrid Providers support
- Repair Worker for missing files (**NEW**)
- Repair Worker for missing files (**BETA**)
The proxy is useful for filtering out un-cached Debrid torrents

View File

@@ -24,7 +24,7 @@ func Start(ctx context.Context) error {
_log.Info().Msgf("Version: %s", version.GetInfo().String())
_log.Debug().Msgf("Config Loaded: %s", cfg.JsonFile())
_log.Debug().Msgf("Default Log Level: %s", cfg.LogLevel)
_log.Info().Msgf("Default Log Level: %s", cfg.LogLevel)
svc := service.New()
_qbit := qbit.New()

View File

@@ -115,9 +115,9 @@ func (c *Config) loadConfig() error {
c.Auth = c.GetAuth()
//Validate the config
//if err := validateConfig(c); err != nil {
// return err
//}
if err := validateConfig(c); err != nil {
return err
}
return nil
}
@@ -143,13 +143,13 @@ func validateDebrids(debrids []Debrid) error {
}
// Check folder existence concurrently
wg.Add(1)
go func(folder string) {
defer wg.Done()
if _, err := os.Stat(folder); os.IsNotExist(err) {
errChan <- fmt.Errorf("debrid folder does not exist: %s", folder)
}
}(debrid.Folder)
//wg.Add(1)
//go func(folder string) {
// defer wg.Done()
// if _, err := os.Stat(folder); os.IsNotExist(err) {
// errChan <- fmt.Errorf("debrid folder does not exist: %s", folder)
// }
//}(debrid.Folder)
}
// Wait for all checks to complete

View File

@@ -3,6 +3,7 @@ package request
import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"golang.org/x/time/rate"
"io"
@@ -109,7 +110,7 @@ func (c *RLHTTPClient) MakeRequest(req *http.Request) ([]byte, error) {
if !statusOk {
// Add status code error to the body
b = append(b, []byte(fmt.Sprintf("\nstatus code: %d", res.StatusCode))...)
return nil, fmt.Errorf(string(b))
return nil, errors.New(string(b))
}
return b, nil
@@ -160,5 +161,8 @@ func ParseRateLimit(rateStr string) *rate.Limiter {
func JSONResponse(w http.ResponseWriter, data interface{}, code int) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
json.NewEncoder(w).Encode(data)
err := json.NewEncoder(w).Encode(data)
if err != nil {
return
}
}

View File

@@ -3,6 +3,7 @@ package arr
import (
"bytes"
"encoding/json"
"fmt"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/request"
"net/http"
@@ -67,6 +68,20 @@ func (a *Arr) Request(method, endpoint string, payload interface{}) (*http.Respo
return client.Do(req)
}
func (a *Arr) Validate() error {
if a.Token == "" || a.Host == "" {
return nil
}
resp, err := a.Request("GET", "/api/v3/health", nil)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("arr test failed: %s", resp.Status)
}
return nil
}
type Storage struct {
Arrs map[string]*Arr // name -> arr
mu sync.RWMutex

View File

@@ -2,8 +2,10 @@ package arr
import (
"encoding/json"
"io"
"net/http"
gourl "net/url"
"strconv"
"strings"
)
@@ -77,24 +79,43 @@ func (a *Arr) GetQueue() []QueueSchema {
query.Add("page", "1")
query.Add("pageSize", "200")
results := make([]QueueSchema, 0)
for {
url := "api/v3/queue" + "?" + query.Encode()
resp, err := a.Request(http.MethodGet, url, nil)
if err != nil {
break
}
defer resp.Body.Close()
var data QueueResponseScheme
if err = json.NewDecoder(resp.Body).Decode(&data); err != nil {
break
}
if len(results) < data.TotalRecords {
func() {
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
return
}
}(resp.Body)
var data QueueResponseScheme
if err = json.NewDecoder(resp.Body).Decode(&data); err != nil {
return
}
results = append(results, data.Records...)
query.Set("page", string(rune(data.Page+1)))
} else {
if len(results) >= data.TotalRecords {
// We've fetched all records
err = io.EOF // Signal to exit the loop
return
}
query.Set("page", strconv.Itoa(data.Page+1))
}()
if err != nil {
break
}
}
return results
}
@@ -133,13 +154,11 @@ func (a *Arr) CleanupQueue() error {
}
queueIds := make([]int, 0)
episodesIds := make([]int, 0)
for _, c := range cleanups {
// Delete the messed up episodes from queue
for _, m := range c {
queueIds = append(queueIds, m.id)
episodesIds = append(episodesIds, m.episodeId)
}
}

View File

@@ -10,7 +10,11 @@ import (
)
func (a *Arr) Refresh() error {
payload := map[string]string{"name": "RefreshMonitoredDownloads"}
payload := struct {
Name string `json:"name"`
}{
Name: "RefreshMonitoredDownloads",
}
resp, err := a.Request(http.MethodPost, "api/v3/command", payload)
if err == nil && resp != nil {
@@ -19,7 +23,8 @@ func (a *Arr) Refresh() error {
return nil
}
}
return fmt.Errorf("failed to refresh monitored downloads for %s", cmp.Or(a.Name, a.Host))
return fmt.Errorf("failed to refresh: %v", err)
}
func (a *Arr) Blacklist(infoHash string) error {

View File

@@ -10,6 +10,7 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent"
"slices"
"net/http"
gourl "net/url"
@@ -192,9 +193,8 @@ func (ad *AllDebrid) CheckStatus(torrent *torrent.Torrent, isSymlink bool) (*tor
}
}
break
} else if status == "downloading" {
} else if slices.Contains(ad.GetDownloadingStatus(), status) {
if !ad.DownloadUncached {
go ad.DeleteTorrent(torrent)
return torrent, fmt.Errorf("torrent: %s not cached", torrent.Name)
}
// Break out of the loop if the torrent is downloading.
@@ -278,6 +278,10 @@ func (ad *AllDebrid) GetTorrents() ([]*torrent.Torrent, error) {
return nil, fmt.Errorf("not implemented")
}
func (ad *AllDebrid) GetDownloadingStatus() []string {
return []string{"downloading"}
}
func New(dc config.Debrid, cache *cache.Cache) *AllDebrid {
rl := request.ParseRateLimit(dc.RateLimit)
headers := map[string]string{

View File

@@ -11,6 +11,7 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/internal/utils"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent"
"slices"
"net/http"
"os"
@@ -109,7 +110,7 @@ func (dl *DebridLink) GetTorrent(id string) (*torrent.Torrent, error) {
if err != nil {
return t, err
}
if res.Success == false {
if !res.Success {
return t, fmt.Errorf("error getting torrent")
}
if res.Value == nil {
@@ -167,7 +168,7 @@ func (dl *DebridLink) SubmitMagnet(t *torrent.Torrent) (*torrent.Torrent, error)
if err != nil {
return nil, err
}
if res.Success == false || res.Value == nil {
if !res.Success || res.Value == nil {
return nil, fmt.Errorf("error adding torrent")
}
data := *res.Value
@@ -216,9 +217,8 @@ func (dl *DebridLink) CheckStatus(torrent *torrent.Torrent, isSymlink bool) (*to
return torrent, err
}
break
} else if status == "downloading" {
} else if slices.Contains(dl.GetDownloadingStatus(), status) {
if !dl.DownloadUncached {
go dl.DeleteTorrent(torrent)
return torrent, fmt.Errorf("torrent: %s not cached", torrent.Name)
}
// Break out of the loop if the torrent is downloading.
@@ -264,6 +264,10 @@ func (dl *DebridLink) GetDownloadLink(t *torrent.Torrent, file *torrent.File) *t
return &dlLink
}
func (dl *DebridLink) GetDownloadingStatus() []string {
return []string{"downloading"}
}
func (dl *DebridLink) GetCheckCached() bool {
return dl.CheckCached
}

View File

@@ -1 +0,0 @@
package debrid

View File

@@ -17,4 +17,5 @@ type Service interface {
GetTorrents() ([]*torrent.Torrent, error)
GetName() string
GetLogger() zerolog.Logger
GetDownloadingStatus() []string
}

View File

@@ -151,7 +151,9 @@ func (r *RealDebrid) SubmitMagnet(t *torrent.Torrent) (*torrent.Torrent, error)
if err != nil {
return nil, err
}
err = json.Unmarshal(resp, &data)
if err = json.Unmarshal(resp, &data); err != nil {
return nil, err
}
t.Id = data.Id
t.Debrid = r.Name
t.MountPath = r.MountPath
@@ -201,7 +203,9 @@ func (r *RealDebrid) CheckStatus(t *torrent.Torrent, isSymlink bool) (*torrent.T
return t, err
}
var data TorrentInfo
err = json.Unmarshal(resp, &data)
if err = json.Unmarshal(resp, &data); err != nil {
return t, err
}
status := data.Status
name := utils.RemoveInvalidChars(data.OriginalFilename)
t.Name = name // Important because some magnet changes the name
@@ -216,7 +220,6 @@ func (r *RealDebrid) CheckStatus(t *torrent.Torrent, isSymlink bool) (*torrent.T
t.Status = status
t.Debrid = r.Name
t.MountPath = r.MountPath
downloadingStatus := []string{"downloading", "magnet_conversion", "queued", "compressing", "uploading"}
if status == "waiting_files_selection" {
files := GetTorrentFiles(data, true) // Validate files to be selected
t.Files = files
@@ -247,9 +250,8 @@ func (r *RealDebrid) CheckStatus(t *torrent.Torrent, isSymlink bool) (*torrent.T
}
}
break
} else if slices.Contains(downloadingStatus, status) {
} else if slices.Contains(r.GetDownloadingStatus(), status) {
if !r.DownloadUncached {
go r.DeleteTorrent(t)
return t, fmt.Errorf("torrent: %s not cached", t.Name)
}
// Break out of the loop if the torrent is downloading.
@@ -380,6 +382,10 @@ func (r *RealDebrid) GetTorrents() ([]*torrent.Torrent, error) {
}
func (r *RealDebrid) GetDownloadingStatus() []string {
return []string{"downloading", "magnet_conversion", "queued", "compressing", "uploading"}
}
func New(dc config.Debrid, cache *cache.Cache) *RealDebrid {
rl := request.ParseRateLimit(dc.RateLimit)
headers := map[string]string{

View File

@@ -232,9 +232,8 @@ func (tb *Torbox) CheckStatus(torrent *torrent.Torrent, isSymlink bool) (*torren
}
}
break
} else if status == "downloading" {
} else if slices.Contains(tb.GetDownloadingStatus(), status) {
if !tb.DownloadUncached {
go tb.DeleteTorrent(torrent)
return torrent, fmt.Errorf("torrent: %s not cached", torrent.Name)
}
// Break out of the loop if the torrent is downloading.
@@ -323,6 +322,10 @@ func (tb *Torbox) GetDownloadLink(t *torrent.Torrent, file *torrent.File) *torre
}
}
func (tb *Torbox) GetDownloadingStatus() []string {
return []string{"downloading"}
}
func (tb *Torbox) GetCheckCached() bool {
return tb.CheckCached
}

View File

@@ -1,2 +0,0 @@
package downloader

View File

@@ -46,8 +46,7 @@ func (q *QBit) CategoryContext(next http.Handler) http.Handler {
category = r.FormValue("category")
}
}
ctx := r.Context()
ctx = context.WithValue(r.Context(), "category", strings.TrimSpace(category))
ctx := context.WithValue(r.Context(), "category", strings.TrimSpace(category))
next.ServeHTTP(w, r.WithContext(ctx))
})
}
@@ -63,8 +62,14 @@ func (q *QBit) authContext(next http.Handler) http.Handler {
a = arr.New(category, "", "", false)
}
if err == nil {
a.Host = strings.TrimSpace(host)
a.Token = strings.TrimSpace(token)
host = strings.TrimSpace(host)
if host != "" {
a.Host = host
}
token = strings.TrimSpace(token)
if token != "" {
a.Token = token
}
}
svc.Arr.AddOrUpdate(a)
@@ -94,6 +99,16 @@ func HashesCtx(next http.Handler) http.Handler {
}
func (q *QBit) handleLogin(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
_arr := ctx.Value("arr").(*arr.Arr)
if _arr == nil {
// No arr
_, _ = w.Write([]byte("Ok."))
return
}
if err := _arr.Validate(); err != nil {
q.logger.Info().Msgf("Error validating arr: %v", err)
}
_, _ = w.Write([]byte("Ok."))
}

View File

@@ -74,7 +74,6 @@ func (i *ImportRequest) Process(q *QBit) (err error) {
torrent := CreateTorrentFromMagnet(magnet, i.Arr.Name, "manual")
debridTorrent, err := debrid.ProcessTorrent(svc.Debrid, magnet, i.Arr, i.IsSymlink)
if err != nil || debridTorrent == nil {
fmt.Println("Error deleting torrent: ", err)
if debridTorrent != nil {
dbClient := service.GetDebrid().GetByName(debridTorrent.Debrid)
go dbClient.DeleteTorrent(debridTorrent)

View File

@@ -3,31 +3,9 @@ package qbit
import (
"github.com/google/uuid"
"github.com/sirrobot01/debrid-blackhole/internal/utils"
debrid "github.com/sirrobot01/debrid-blackhole/pkg/debrid/torrent"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
func checkFileLoop(wg *sync.WaitGroup, dir string, file debrid.File, ready chan<- debrid.File) {
defer wg.Done()
ticker := time.NewTicker(1 * time.Second) // Check every second
defer ticker.Stop()
path := filepath.Join(dir, file.Path)
for {
select {
case <-ticker.C:
_, err := os.Stat(path)
if !os.IsNotExist(err) {
ready <- file
return
}
}
}
}
func CreateTorrentFromMagnet(magnet *utils.Magnet, category, source string) *Torrent {
torrent := &Torrent{
ID: uuid.NewString(),

View File

@@ -16,7 +16,6 @@ type QBit struct {
DownloadFolder string `json:"download_folder"`
Categories []string `json:"categories"`
Storage *TorrentStorage
debug bool
logger zerolog.Logger
Tags []string
RefreshInterval int

View File

@@ -8,10 +8,9 @@ import (
func (q *QBit) Routes() http.Handler {
r := chi.NewRouter()
r.Use(q.CategoryContext)
r.Post("/auth/login", q.handleLogin)
r.Group(func(r chi.Router) {
r.Use(q.authContext)
r.Post("/auth/login", q.handleLogin)
r.Route("/torrents", func(r chi.Router) {
r.Use(HashesCtx)
r.Get("/info", q.handleTorrentsInfo)

View File

@@ -51,14 +51,24 @@ func (ts *TorrentStorage) Add(torrent *Torrent) {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.torrents[keyPair(torrent.Hash, torrent.Category)] = torrent
_ = ts.saveToFile()
go func() {
err := ts.saveToFile()
if err != nil {
fmt.Println(err)
}
}()
}
func (ts *TorrentStorage) AddOrUpdate(torrent *Torrent) {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.torrents[keyPair(torrent.Hash, torrent.Category)] = torrent
_ = ts.saveToFile()
go func() {
err := ts.saveToFile()
if err != nil {
fmt.Println(err)
}
}()
}
func (ts *TorrentStorage) Get(hash, category string) *Torrent {
@@ -108,7 +118,12 @@ func (ts *TorrentStorage) Update(torrent *Torrent) {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.torrents[keyPair(torrent.Hash, torrent.Category)] = torrent
_ = ts.saveToFile()
go func() {
err := ts.saveToFile()
if err != nil {
fmt.Println(err)
}
}()
}
func (ts *TorrentStorage) Delete(hash, category string) {
@@ -127,6 +142,9 @@ func (ts *TorrentStorage) Delete(hash, category string) {
}
}
delete(ts.torrents, key)
if torrent == nil {
return
}
// Delete the torrent folder
if torrent.ContentPath != "" {
err := os.RemoveAll(torrent.ContentPath)
@@ -134,7 +152,12 @@ func (ts *TorrentStorage) Delete(hash, category string) {
return
}
}
_ = ts.saveToFile()
go func() {
err := ts.saveToFile()
if err != nil {
fmt.Println(err)
}
}()
}
func (ts *TorrentStorage) DeleteMultiple(hashes []string) {
@@ -147,7 +170,12 @@ func (ts *TorrentStorage) DeleteMultiple(hashes []string) {
}
}
}
_ = ts.saveToFile()
go func() {
err := ts.saveToFile()
if err != nil {
fmt.Println(err)
}
}()
}
func (ts *TorrentStorage) Save() error {

View File

@@ -75,19 +75,26 @@ func (q *QBit) Process(ctx context.Context, magnet *utils.Magnet, category strin
func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr *arr.Arr, isSymlink bool) {
debridClient := service.GetDebrid().GetByName(debridTorrent.Debrid)
for debridTorrent.Status != "downloaded" {
progress := debridTorrent.Progress
q.logger.Debug().Msgf("%s -> (%s) Download Progress: %.2f%%", debridTorrent.Debrid, debridTorrent.Name, progress)
time.Sleep(10 * time.Second)
q.logger.Debug().Msgf("%s <- (%s) Download Progress: %.2f%%", debridTorrent.Debrid, debridTorrent.Name, debridTorrent.Progress)
dbT, err := debridClient.CheckStatus(debridTorrent, isSymlink)
if err != nil {
q.logger.Error().Msgf("Error checking status: %v", err)
go debridClient.DeleteTorrent(debridTorrent)
q.MarkAsFailed(torrent)
_ = arr.Refresh()
if err := arr.Refresh(); err != nil {
q.logger.Error().Msgf("Error refreshing arr: %v", err)
}
return
}
debridTorrent = dbT
torrent = q.UpdateTorrentMin(torrent, debridTorrent)
// Exit the loop for downloading statuses to prevent memory buildup
if !slices.Contains(debridClient.GetDownloadingStatus(), debridTorrent.Status) {
break
}
time.Sleep(time.Duration(q.RefreshInterval) * time.Second)
}
var (
torrentSymlinkPath string
@@ -107,7 +114,9 @@ func (q *QBit) ProcessFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr
}
torrent.TorrentPath = torrentSymlinkPath
q.UpdateTorrent(torrent, debridTorrent)
_ = arr.Refresh()
if err := arr.Refresh(); err != nil {
q.logger.Error().Msgf("Error refreshing arr: %v", err)
}
}
func (q *QBit) MarkAsFailed(t *Torrent) *Torrent {
@@ -160,14 +169,10 @@ func (q *QBit) UpdateTorrentMin(t *Torrent, debridTorrent *debrid.Torrent) *Torr
}
func (q *QBit) UpdateTorrent(t *Torrent, debridTorrent *debrid.Torrent) *Torrent {
_db := service.GetDebrid().GetByName(debridTorrent.Debrid)
if debridTorrent == nil && t.ID != "" {
debridTorrent, _ = _db.GetTorrent(t.ID)
}
if debridTorrent == nil {
q.logger.Info().Msgf("Torrent with ID %s not found in %s", t.ID, _db.GetName())
return t
}
_db := service.GetDebrid().GetByName(debridTorrent.Debrid)
if debridTorrent.Status != "downloaded" {
debridTorrent, _ = _db.GetTorrent(t.ID)
}
@@ -180,7 +185,7 @@ func (q *QBit) UpdateTorrent(t *Torrent, debridTorrent *debrid.Torrent) *Torrent
return t
}
ticker := time.NewTicker(2 * time.Second)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {

View File

@@ -1,39 +0,0 @@
package qbit
import (
"context"
"github.com/sirrobot01/debrid-blackhole/pkg/service"
"time"
)
func (q *QBit) StartWorker(ctx context.Context) {
q.logger.Info().Msg("Qbit Worker started")
q.StartRefreshWorker(ctx)
}
func (q *QBit) StartRefreshWorker(ctx context.Context) {
refreshCtx := context.WithValue(ctx, "worker", "refresh")
refreshTicker := time.NewTicker(time.Duration(q.RefreshInterval) * time.Second)
for {
select {
case <-refreshCtx.Done():
q.logger.Info().Msg("Qbit Refresh Worker stopped")
return
case <-refreshTicker.C:
torrents := q.Storage.GetAll("", "", nil)
if len(torrents) > 0 {
q.RefreshArrs()
}
}
}
}
func (q *QBit) RefreshArrs() {
arrs := service.GetService().Arr
for _, arr := range arrs.GetAll() {
err := arr.Refresh()
if err != nil {
return
}
}
}

View File

@@ -1,283 +0,0 @@
package rclone
import (
"bufio"
"context"
"fmt"
"github.com/rs/zerolog"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/pkg/webdav"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
)
type Remote struct {
Type string `json:"type"`
Name string `json:"name"`
Url string `json:"url"`
MountPoint string `json:"mount_point"`
Flags map[string]string `json:"flags"`
}
func (rc *Rclone) Config() string {
var content string
for _, remote := range rc.Remotes {
content += fmt.Sprintf("[%s]\n", remote.Name)
content += fmt.Sprintf("type = %s\n", remote.Type)
content += fmt.Sprintf("url = %s\n", remote.Url)
content += fmt.Sprintf("vendor = other\n")
for key, value := range remote.Flags {
content += fmt.Sprintf("%s = %s\n", key, value)
}
content += "\n\n"
}
return content
}
type Rclone struct {
Remotes map[string]Remote `json:"remotes"`
logger zerolog.Logger
cmd *exec.Cmd
configPath string
}
func New(webdav *webdav.WebDav) (*Rclone, error) {
// Check if rclone is installed
cfg := config.GetConfig()
configPath := fmt.Sprintf("%s/rclone.conf", cfg.Path)
if _, err := exec.LookPath("rclone"); err != nil {
return nil, fmt.Errorf("rclone is not installed: %w", err)
}
remotes := make(map[string]Remote)
for _, handler := range webdav.Handlers {
url := fmt.Sprintf("http://localhost:%s/webdav/%s/", cfg.QBitTorrent.Port, strings.ToLower(handler.Name))
rmt := Remote{
Type: "webdav",
Name: handler.Name,
Url: url,
MountPoint: filepath.Join("/mnt/rclone/", handler.Name),
Flags: map[string]string{},
}
remotes[handler.Name] = rmt
}
rc := &Rclone{
logger: logger.NewLogger("rclone", "info", os.Stdout),
Remotes: remotes,
configPath: configPath,
}
if err := rc.WriteConfig(); err != nil {
return nil, err
}
return rc, nil
}
func (rc *Rclone) WriteConfig() error {
// Create config directory if it doesn't exist
configDir := filepath.Dir(rc.configPath)
if err := os.MkdirAll(configDir, 0755); err != nil {
return fmt.Errorf("failed to create config directory: %w", err)
}
// Write the config file
if err := os.WriteFile(rc.configPath, []byte(rc.Config()), 0600); err != nil {
return fmt.Errorf("failed to write config file: %w", err)
}
rc.logger.Info().Msgf("Wrote rclone config with %d remotes to %s", len(rc.Remotes), rc.configPath)
return nil
}
func (rc *Rclone) Start(ctx context.Context) error {
var wg sync.WaitGroup
errChan := make(chan error)
for _, remote := range rc.Remotes {
wg.Add(1)
go func(remote Remote) {
defer wg.Done()
if err := rc.Mount(ctx, &remote); err != nil {
rc.logger.Error().Err(err).Msgf("failed to mount %s", remote.Name)
select {
case errChan <- err:
default:
}
}
}(remote)
}
return <-errChan
}
func (rc *Rclone) testConnection(ctx context.Context, remote *Remote) error {
testArgs := []string{
"ls",
"--config", rc.configPath,
"--log-level", "DEBUG",
remote.Name + ":",
}
cmd := exec.CommandContext(ctx, "rclone", testArgs...)
output, err := cmd.CombinedOutput()
if err != nil {
rc.logger.Error().Err(err).Str("output", string(output)).Msg("Connection test failed")
return fmt.Errorf("connection test failed: %w", err)
}
rc.logger.Info().Msg("Connection test successful")
return nil
}
func (rc *Rclone) Mount(ctx context.Context, remote *Remote) error {
// Ensure the mount point directory exists
if err := os.MkdirAll(remote.MountPoint, 0755); err != nil {
rc.logger.Info().Err(err).Msgf("failed to create mount point directory: %s", remote.MountPoint)
return err
}
//if err := rc.testConnection(ctx, remote); err != nil {
// return err
//}
// Basic arguments
args := []string{
"mount",
remote.Name + ":",
remote.MountPoint,
"--config", rc.configPath,
"--vfs-cache-mode", "full",
"--log-level", "DEBUG", // Keep this, remove -vv
"--allow-other", // Keep this
"--allow-root", // Add this
"--default-permissions", // Add this
"--vfs-cache-max-age", "24h",
"--timeout", "1m",
"--transfers", "4",
"--buffer-size", "32M",
}
// Add any additional flags
for key, value := range remote.Flags {
args = append(args, "--"+key, value)
}
// Create command
rc.cmd = exec.CommandContext(ctx, "rclone", args...)
// Set up pipes for stdout and stderr
stdout, err := rc.cmd.StdoutPipe()
if err != nil {
return err
}
stderr, err := rc.cmd.StderrPipe()
if err != nil {
return err
}
// Start the command
if err := rc.cmd.Start(); err != nil {
return err
}
// Channel to signal mount success
mountReady := make(chan bool)
mountError := make(chan error)
// Monitor stdout
go func() {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
text := scanner.Text()
rc.logger.Info().Msg("stdout: " + text)
if strings.Contains(text, "Mount succeeded") {
mountReady <- true
return
}
}
}()
// Monitor stderr
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
text := scanner.Text()
rc.logger.Info().Msg("stderr: " + text)
if strings.Contains(text, "error") {
mountError <- fmt.Errorf("mount error: %s", text)
return
}
}
}()
// Wait for mount with timeout
select {
case <-mountReady:
rc.logger.Info().Msgf("Successfully mounted %s at %s", remote.Name, remote.MountPoint)
return nil
case err := <-mountError:
err = rc.cmd.Process.Kill()
if err != nil {
return err
}
return err
case <-ctx.Done():
err := rc.cmd.Process.Kill()
if err != nil {
return err
}
return ctx.Err()
case <-time.After(30 * time.Second):
err := rc.cmd.Process.Kill()
if err != nil {
return err
}
return fmt.Errorf("mount timeout after 30 seconds")
}
}
func (rc *Rclone) Unmount(ctx context.Context, remote *Remote) error {
if rc.cmd != nil && rc.cmd.Process != nil {
// First try graceful shutdown
if err := rc.cmd.Process.Signal(os.Interrupt); err != nil {
rc.logger.Warn().Err(err).Msg("failed to send interrupt signal")
}
// Wait for a bit to allow graceful shutdown
done := make(chan error)
go func() {
done <- rc.cmd.Wait()
}()
select {
case err := <-done:
if err != nil {
rc.logger.Warn().Err(err).Msg("process exited with error")
}
case <-time.After(5 * time.Second):
// Force kill if it doesn't shut down gracefully
if err := rc.cmd.Process.Kill(); err != nil {
rc.logger.Error().Err(err).Msg("failed to kill process")
return err
}
}
}
// Use fusermount to ensure the mountpoint is unmounted
cmd := exec.CommandContext(ctx, "fusermount", "-u", remote.MountPoint)
if err := cmd.Run(); err != nil {
rc.logger.Warn().Err(err).Msg("fusermount unmount failed")
// Don't return error here as the process might already be dead
}
rc.logger.Info().Msgf("Successfully unmounted %s", remote.MountPoint)
return nil
}

View File

@@ -178,13 +178,13 @@ func (r *Repair) RepairArr(a *arr.Arr, tmdbId string) error {
r.logger.Info().Msgf("Starting repair for %s", a.Name)
media, err := a.GetMedia(tmdbId)
if err != nil {
r.logger.Info().Msgf("Failed to get %s media: %v", a.Type, err)
r.logger.Info().Msgf("Failed to get %s media: %v", a.Name, err)
return err
}
r.logger.Info().Msgf("Found %d %s media", len(media), a.Type)
r.logger.Info().Msgf("Found %d %s media", len(media), a.Name)
if len(media) == 0 {
r.logger.Info().Msgf("No %s media found", a.Type)
r.logger.Info().Msgf("No %s media found", a.Name)
return nil
}
// Check first media to confirm mounts are accessible

View File

@@ -436,7 +436,7 @@ func (ui *Handler) handleGetConfig(w http.ResponseWriter, r *http.Request) {
arrCfgs := make([]config.Arr, 0)
svc := service.GetService()
for _, a := range svc.Arr.GetAll() {
arrCfgs = append(arrCfgs, config.Arr{Host: a.Host, Name: a.Name, Token: a.Token})
arrCfgs = append(arrCfgs, config.Arr{Host: a.Host, Name: a.Name, Token: a.Token, Cleanup: a.Cleanup})
}
cfg.Arrs = arrCfgs
request.JSONResponse(w, cfg, http.StatusOK)

View File

@@ -126,19 +126,27 @@
<div class="section mb-5">
<h5 class="border-bottom pb-2">Repair Configuration</h5>
<div class="row">
<div class="col-md-6 mb-3">
<div class="col-md-3 mb-3">
<label class="form-label">Interval</label>
<input type="text" disabled class="form-control" name="repair.interval" placeholder="e.g., 24h">
</div>
<div class="col-12">
<div class="form-check mb-2">
<input type="checkbox" disabled class="form-check-input" name="repair.enabled" id="repairEnabled">
<label class="form-check-label" for="repairEnabled">Enable Repair</label>
</div>
<div class="form-check">
<input type="checkbox" disabled class="form-check-input" name="repair.run_on_start" id="repairOnStart">
<label class="form-check-label" for="repairOnStart">Run on Start</label>
</div>
<div class="col-md-4 mb-3">
<label class="form-label">Zurg URL</label>
<input type="text" disabled class="form-control" name="repair.zurg_url" placeholder="http://zurg:9999">
</div>
</div>
<div class="col-12">
<div class="form-check me-3 d-inline-block">
<input type="checkbox" disabled class="form-check-input" name="repair.enabled" id="repairEnabled">
<label class="form-check-label" for="repairEnabled">Enable Repair</label>
</div>
<div class="form-check me-3 d-inline-block">
<input type="checkbox" disabled class="form-check-input" name="repair.run_on_start" id="repairOnStart">
<label class="form-check-label" for="repairOnStart">Run on Start</label>
</div>
<div class="form-check d-inline-block">
<input type="checkbox" disabled class="form-check-input" name="repair.skip_deletion" id="skipDeletion">
<label class="form-check-label" for="skipDeletion">Run on Start</label>
</div>
</div>
</div>
@@ -201,6 +209,14 @@
<input type="password" disabled class="form-control" name="arr[${index}].token" required>
</div>
</div>
<div class="row">
<div class="col-md-2 mb-3">
<div class="form-check">
<label class="form-check-label" for="repairOnStart">Cleanup Queue</label>
<input type="checkbox" disabled class="form-check-input" name="arr[${index}].cleanup">
</div>
</div>
</div>
</div>
`;

View File

@@ -114,10 +114,9 @@
}
} else {
createToast(`Successfully added ${result.results.length} torrents!`);
document.getElementById('magnetURI').value = '';
document.getElementById('torrentFiles').value = '';
}
document.getElementById('magnetURI').value = '';
document.getElementById('torrentFiles').value = '';
} catch (error) {
createToast(`Error adding downloads: ${error.message}`, 'error');
} finally {

View File

@@ -5,7 +5,6 @@ import (
"github.com/rs/zerolog"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/pkg/arr"
"github.com/sirrobot01/debrid-blackhole/pkg/service"
"os"
"sync"
@@ -31,13 +30,6 @@ func Start(ctx context.Context) error {
// Start Arr Refresh Worker
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
arrRefreshWorker(ctx, cfg)
}()
wg.Add(1)
go func() {
defer wg.Done()
@@ -47,46 +39,27 @@ func Start(ctx context.Context) error {
return nil
}
func arrRefreshWorker(ctx context.Context, cfg *config.Config) {
// Start Arr Refresh Worker
_logger := getLogger()
_logger.Debug().Msg("Refresh Worker started")
refreshCtx := context.WithValue(ctx, "worker", "refresh")
refreshTicker := time.NewTicker(time.Duration(cfg.QBitTorrent.RefreshInterval) * time.Second)
var refreshMutex sync.Mutex
for {
select {
case <-refreshCtx.Done():
_logger.Debug().Msg("Refresh Worker stopped")
return
case <-refreshTicker.C:
if refreshMutex.TryLock() {
go func() {
defer refreshMutex.Unlock()
refreshArrs()
}()
} else {
_logger.Debug().Msg("Previous refresh still running, skipping this cycle")
}
}
}
}
//func arrRefreshWorker(ctx context.Context, cfg *config.Config) {
// // Start Arr Refresh Worker
// _logger := getLogger()
// _logger.Debug().Msg("Refresh Worker started")
// refreshCtx := context.WithValue(ctx, "worker", "refresh")
// refreshTicker := time.NewTicker(time.Duration(cfg.QBitTorrent.RefreshInterval) * time.Second)
//
// for {
// select {
// case <-refreshCtx.Done():
// _logger.Debug().Msg("Refresh Worker stopped")
// return
// case <-refreshTicker.C:
// refreshArrs()
// }
// }
//}
func cleanUpQueuesWorker(ctx context.Context, cfg *config.Config) {
// Start Clean up Queues Worker
_logger := getLogger()
_arrs := service.GetService().Arr
filtered := make([]*arr.Arr, 0)
for _, a := range _arrs.GetAll() {
if a.Cleanup {
filtered = append(filtered, a)
}
}
if len(filtered) == 0 {
_logger.Debug().Msg("No ARR instances configured for cleanup")
return
}
_logger.Debug().Msg("Clean up Queues Worker started")
cleanupCtx := context.WithValue(ctx, "worker", "cleanup")
cleanupTicker := time.NewTicker(time.Duration(10) * time.Second)
@@ -102,27 +75,31 @@ func cleanUpQueuesWorker(ctx context.Context, cfg *config.Config) {
if cleanupMutex.TryLock() {
go func() {
defer cleanupMutex.Unlock()
cleanUpQueues(filtered)
cleanUpQueues()
}()
}
}
}
}
func refreshArrs() {
arrs := service.GetService().Arr
for _, arr := range arrs.GetAll() {
err := arr.Refresh()
if err != nil {
return
}
}
}
//func refreshArrs() {
// for _, a := range service.GetService().Arr.GetAll() {
// err := a.Refresh()
// if err != nil {
// _logger := getLogger()
// _logger.Debug().Err(err).Msg("Error refreshing arr")
// return
// }
// }
//}
func cleanUpQueues(arrs []*arr.Arr) {
func cleanUpQueues() {
// Clean up queues
_logger := getLogger()
for _, a := range arrs {
for _, a := range service.GetService().Arr.GetAll() {
if !a.Cleanup {
continue
}
_logger.Debug().Msgf("Cleaning up queue for %s", a.Name)
if err := a.CleanupQueue(); err != nil {
_logger.Debug().Err(err).Msg("Error cleaning up queue")