Changelog 0.2.2

This commit is contained in:
Mukhtar Akere
2024-09-15 03:33:28 +01:00
parent d5e07dc961
commit 329e4c60f5
22 changed files with 445 additions and 245 deletions

101
pkg/qbit/arr.go Normal file
View File

@@ -0,0 +1,101 @@
package qbit
import (
"bytes"
"cmp"
"encoding/json"
"goBlack/common"
"goBlack/pkg/debrid"
"net/http"
gourl "net/url"
"strconv"
"strings"
)
func (q *QBit) RefreshArr(arr *debrid.Arr) {
if arr.Token == "" || arr.Host == "" {
return
}
url, err := common.JoinURL(arr.Host, "api/v3/command")
if err != nil {
return
}
payload := map[string]string{"name": "RefreshMonitoredDownloads"}
jsonPayload, err := json.Marshal(payload)
if err != nil {
return
}
client := &http.Client{}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonPayload))
if err != nil {
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Api-Key", arr.Token)
resp, reqErr := client.Do(req)
if reqErr == nil {
statusOk := strconv.Itoa(resp.StatusCode)[0] == '2'
if statusOk {
q.logger.Printf("Refreshed monitored downloads for %s", cmp.Or(arr.Name, arr.Host))
}
}
if reqErr != nil {
}
}
func (q *QBit) GetArrHistory(arr *debrid.Arr, downloadId, eventType string) *debrid.ArrHistorySchema {
query := gourl.Values{}
if downloadId != "" {
query.Add("downloadId", downloadId)
}
query.Add("eventType", eventType)
query.Add("pageSize", "100")
url, _ := common.JoinURL(arr.Host, "history")
url += "?" + query.Encode()
resp, err := http.Get(url)
if err != nil {
return nil
}
var data *debrid.ArrHistorySchema
if err = json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil
}
return data
}
func (q *QBit) MarkArrAsFailed(torrent *Torrent, arr *debrid.Arr) error {
downloadId := strings.ToUpper(torrent.Hash)
history := q.GetArrHistory(arr, downloadId, "grabbed")
if history == nil {
return nil
}
torrentId := 0
for _, record := range history.Records {
if strings.EqualFold(record.DownloadID, downloadId) {
torrentId = record.ID
break
}
}
if torrentId != 0 {
url, err := common.JoinURL(arr.Host, "history/failed/", strconv.Itoa(torrentId))
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, url, nil)
if err != nil {
return err
}
client := &http.Client{}
_, err = client.Do(req)
if err == nil {
q.logger.Printf("Marked torrent: %s as failed", torrent.Name)
}
}
return nil
}

View File

@@ -10,7 +10,8 @@ func (q *QBit) AddRoutes(r chi.Router) http.Handler {
r.Post("/auth/login", q.handleLogin)
r.Group(func(r chi.Router) {
r.Use(q.authMiddleware)
//r.Use(q.authMiddleware)
r.Use(q.authContext)
r.Route("/torrents", func(r chi.Router) {
r.Use(HashesCtx)
r.Get("/info", q.handleTorrentsInfo)

View File

@@ -2,49 +2,9 @@ package qbit
import (
"net/http"
"time"
)
func (q *QBit) handleLogin(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
http.Error(w, "Failed to parse form data", http.StatusBadRequest)
return
}
username := r.Form.Get("username")
password := r.Form.Get("password")
// In a real implementation, you'd verify credentials here
// For this mock, we'll accept any non-empty username and password
if username == "" || password == "" {
http.Error(w, "Invalid credentials", http.StatusUnauthorized)
return
}
if username != q.Username || password != q.Password {
http.Error(w, "Invalid credentials", http.StatusUnauthorized)
return
}
// Generate a new SID
sid, err := generateSID()
if err != nil {
http.Error(w, "Failed to generate session ID", http.StatusInternalServerError)
return
}
// Set the SID cookie
http.SetCookie(w, &http.Cookie{
Name: cookieName,
Value: sid,
Path: "/",
HttpOnly: true,
MaxAge: 315360000,
})
// Store the session
sessions.Store(sid, time.Now().Add(24*time.Hour))
w.WriteHeader(http.StatusOK)
w.Write([]byte("Ok."))
}

View File

@@ -19,6 +19,7 @@ func (q *QBit) handleTorrentsInfo(w http.ResponseWriter, r *http.Request) {
}
func (q *QBit) handleTorrentsAdd(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
contentType := strings.Split(r.Header.Get("Content-Type"), ";")[0]
switch contentType {
case "multipart/form-data":
@@ -52,7 +53,7 @@ func (q *QBit) handleTorrentsAdd(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
go q.Process(magnet, category)
go q.Process(ctx, magnet, category)
}
@@ -68,7 +69,7 @@ func (q *QBit) handleTorrentsAdd(w http.ResponseWriter, r *http.Request) {
q.logger.Printf("Error reading file: %s", fileHeader.Filename)
return
}
go q.Process(magnet, category)
go q.Process(ctx, magnet, category)
}
}

View File

@@ -2,6 +2,7 @@ package qbit
import (
"cmp"
"context"
"fmt"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
@@ -10,46 +11,51 @@ import (
"log"
"net/http"
"os"
"sync"
"time"
)
type QBit struct {
Username string `json:"username"`
Password string `json:"password"`
Port string `json:"port"`
DownloadFolder string `json:"download_folder"`
Categories []string `json:"categories"`
debrid debrid.Service
cache *common.Cache
storage *TorrentStorage
debug bool
logger *log.Logger
type WorkerType struct {
ticker *time.Ticker
ctx context.Context
}
var (
sessions sync.Map
)
type Worker struct {
types map[string]WorkerType
}
const (
sidLength = 32
cookieName = "SID"
)
type QBit struct {
Username string `json:"username"`
Password string `json:"password"`
Port string `json:"port"`
DownloadFolder string `json:"download_folder"`
Categories []string `json:"categories"`
debrid debrid.Service
cache *common.Cache
storage *TorrentStorage
debug bool
logger *log.Logger
arrs map[string]string // host:token (Used for refreshing in worker)
RefreshInterval int
}
func NewQBit(config *common.Config, deb debrid.Service, cache *common.Cache) *QBit {
cfg := config.QBitTorrent
storage := NewTorrentStorage("torrents.json")
port := cmp.Or(cfg.Port, os.Getenv("QBIT_PORT"), "8182")
refreshInterval := cmp.Or(cfg.RefreshInterval, 10)
return &QBit{
Username: cfg.Username,
Password: cfg.Password,
Port: port,
DownloadFolder: cfg.DownloadFolder,
Categories: cfg.Categories,
debrid: deb,
cache: cache,
debug: cfg.Debug,
storage: storage,
logger: common.NewLogger("QBit", os.Stdout),
Username: cfg.Username,
Password: cfg.Password,
Port: port,
DownloadFolder: cfg.DownloadFolder,
Categories: cfg.Categories,
debrid: deb,
cache: cache,
debug: cfg.Debug,
storage: storage,
logger: common.NewLogger("QBit", os.Stdout),
arrs: make(map[string]string),
RefreshInterval: refreshInterval,
}
}
@@ -61,6 +67,10 @@ func (q *QBit) Start() {
q.AddRoutes(r)
ctx := context.Background()
go q.StartWorker(ctx)
q.logger.Printf("Starting QBit server on :%s", q.Port)
port := fmt.Sprintf(":%s", q.Port)
q.logger.Fatal(http.ListenAndServe(port, r))

View File

@@ -3,6 +3,7 @@ package qbit
import (
"context"
"crypto/subtle"
"encoding/base64"
"github.com/go-chi/chi/v5"
"net/http"
"strings"
@@ -23,6 +24,42 @@ func (q *QBit) authMiddleware(next http.Handler) http.Handler {
})
}
func DecodeAuthHeader(header string) (string, string, error) {
encodedTokens := strings.Split(header, " ")
if len(encodedTokens) != 2 {
return "", "", nil
}
encodedToken := encodedTokens[1]
bytes, err := base64.StdEncoding.DecodeString(encodedToken)
if err != nil {
return "", "", err
}
bearer := string(bytes)
colonIndex := strings.LastIndex(bearer, ":")
host := bearer[:colonIndex]
token := bearer[colonIndex+1:]
return host, token, nil
}
func (q *QBit) authContext(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
host, token, err := DecodeAuthHeader(r.Header.Get("Authorization"))
ctx := r.Context()
if err == nil {
ctx = context.WithValue(r.Context(), "host", host)
ctx = context.WithValue(ctx, "token", token)
q.arrs[host] = token
next.ServeHTTP(w, r.WithContext(ctx))
return
}
next.ServeHTTP(w, r.WithContext(ctx))
})
}
func HashesCtx(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_hashes := chi.URLParam(r, "hashes")

View File

@@ -1,6 +1,7 @@
package qbit
import (
"context"
"github.com/google/uuid"
"goBlack/common"
"goBlack/pkg/debrid"
@@ -11,10 +12,15 @@ import (
"time"
)
func (q *QBit) Process(magnet *common.Magnet, category string) (*Torrent, error) {
func (q *QBit) Process(ctx context.Context, magnet *common.Magnet, category string) (*Torrent, error) {
torrent := q.CreateTorrentFromMagnet(magnet, category)
go q.storage.AddOrUpdate(torrent)
debridTorrent, err := debrid.ProcessQBitTorrent(q.debrid, magnet, category)
arr := &debrid.Arr{
Name: category,
Token: ctx.Value("token").(string),
Host: ctx.Value("host").(string),
}
debridTorrent, err := debrid.ProcessQBitTorrent(q.debrid, magnet, arr)
if err != nil || debridTorrent == nil {
// Mark as failed
q.logger.Printf("Failed to process torrent: %s: %v", magnet.Name, err)
@@ -22,9 +28,9 @@ func (q *QBit) Process(magnet *common.Magnet, category string) (*Torrent, error)
return torrent, err
}
torrent.ID = debridTorrent.Id
torrent.Name = debridTorrent.Name // Update the name before adding it to *arrs storage
torrent.DebridTorrent = debridTorrent
go q.processFiles(torrent, debridTorrent)
torrent.Name = debridTorrent.Name
q.processFiles(torrent, debridTorrent, arr)
return torrent, nil
}
@@ -57,22 +63,29 @@ func (q *QBit) CreateTorrentFromMagnet(magnet *common.Magnet, category string) *
return torrent
}
func (q *QBit) processFiles(torrent *Torrent, debridTorrent *debrid.Torrent) {
func (q *QBit) processFiles(torrent *Torrent, debridTorrent *debrid.Torrent, arr *debrid.Arr) {
var wg sync.WaitGroup
files := debridTorrent.Files
ready := make(chan debrid.TorrentFile, len(files))
q.logger.Printf("Checking %d files...", len(files))
rCloneMountPath := q.debrid.GetMountPath()
path := filepath.Join(q.DownloadFolder, debridTorrent.Arr.CompletedFolder, debridTorrent.Folder) // /mnt/symlinks/{category}/MyTVShow/
err := os.MkdirAll(path, os.ModePerm)
rCloneBase := q.debrid.GetMountPath()
torrentPath, err := q.getTorrentPath(rCloneBase, debridTorrent) // /MyTVShow/
if err != nil {
q.logger.Printf("Failed to create directory: %s\n", path)
q.logger.Printf("Error: %v", err)
return
}
torrentSymlinkPath := filepath.Join(q.DownloadFolder, debridTorrent.Arr.Name, torrentPath) // /mnt/symlinks/{category}/MyTVShow/
err = os.MkdirAll(torrentSymlinkPath, os.ModePerm)
if err != nil {
q.logger.Printf("Failed to create directory: %s\n", torrentSymlinkPath)
return
}
torrentRclonePath := filepath.Join(rCloneBase, torrentPath)
for _, file := range files {
wg.Add(1)
go checkFileLoop(&wg, rCloneMountPath, file, ready)
go checkFileLoop(&wg, torrentRclonePath, file, ready)
}
go func() {
@@ -82,23 +95,49 @@ func (q *QBit) processFiles(torrent *Torrent, debridTorrent *debrid.Torrent) {
for f := range ready {
q.logger.Println("File is ready:", f.Path)
q.createSymLink(path, debridTorrent, f)
q.createSymLink(torrentSymlinkPath, torrentRclonePath, f)
}
// Update the torrent when all files are ready
torrent.TorrentPath = filepath.Base(torrentPath) // Quite important
q.UpdateTorrent(torrent, debridTorrent)
q.logger.Printf("%s COMPLETED \n", debridTorrent.Name)
q.RefreshArr(arr)
}
func (q *QBit) createSymLink(path string, torrent *debrid.Torrent, file debrid.TorrentFile) {
func (q *QBit) getTorrentPath(rclonePath string, debridTorrent *debrid.Torrent) (string, error) {
pathChan := make(chan string)
errChan := make(chan error)
go func() {
for {
torrentPath := debridTorrent.GetMountFolder(rclonePath)
if torrentPath != "" {
pathChan <- torrentPath
return
}
time.Sleep(time.Second)
}
}()
select {
case path := <-pathChan:
return path, nil
case err := <-errChan:
return "", err
}
}
func (q *QBit) createSymLink(path string, torrentMountPath string, file debrid.TorrentFile) {
// Combine the directory and filename to form a full path
fullPath := filepath.Join(path, file.Name) // /mnt/symlinks/{category}/MyTVShow/MyTVShow.S01E01.720p.mkv
// Create a symbolic link if file doesn't exist
torrentMountPath := filepath.Join(q.debrid.GetMountPath(), torrent.Folder, file.Name) // debridFolder/MyTVShow/MyTVShow.S01E01.720p.mkv
_ = os.Symlink(torrentMountPath, fullPath)
// Check if the file exists
if !fileReady(fullPath) {
torrentFilePath := filepath.Join(torrentMountPath, file.Name) // debridFolder/MyTVShow/MyTVShow.S01E01.720p.mkv
err := os.Symlink(torrentFilePath, fullPath)
if err != nil {
q.logger.Printf("Failed to create symlink: %s\n", fullPath)
}
// Check if the file exists
if !common.FileReady(fullPath) {
q.logger.Printf("Symlink not ready: %s\n", fullPath)
}
}

View File

@@ -95,7 +95,7 @@ func (ts *TorrentStorage) GetAll(category string, filter string, hashes []string
filtered = append(filtered, torrent)
}
}
return filtered
torrents = filtered
}
return torrents
}

View File

@@ -171,6 +171,7 @@ type TorrentCategory struct {
type Torrent struct {
ID string `json:"-"`
DebridTorrent *debrid.Torrent `json:"-"`
TorrentPath string `json:"-"`
AddedOn int64 `json:"added_on,omitempty"`
AmountLeft int64 `json:"amount_left,omitempty"`
@@ -218,6 +219,10 @@ type Torrent struct {
Upspeed int64 `json:"upspeed,omitempty"`
}
func (t *Torrent) IsReady() bool {
return t.AmountLeft <= 0 && t.TorrentPath != ""
}
type TorrentProperties struct {
AdditionDate int64 `json:"addition_date,omitempty"`
Comment string `json:"comment,omitempty"`

View File

@@ -17,6 +17,7 @@ func (q *QBit) MarkAsFailed(t *Torrent) *Torrent {
}
func (q *QBit) UpdateTorrent(t *Torrent, debridTorrent *debrid.Torrent) *Torrent {
rcLoneMount := q.debrid.GetMountPath()
if debridTorrent == nil && t.ID != "" {
debridTorrent, _ = q.debrid.GetTorrent(t.ID)
}
@@ -24,31 +25,40 @@ func (q *QBit) UpdateTorrent(t *Torrent, debridTorrent *debrid.Torrent) *Torrent
q.logger.Printf("Torrent with ID %s not found in %s", t.ID, q.debrid.GetName())
return t
}
totalSize := cmp.Or(debridTorrent.Bytes, 1)
progress := int64(cmp.Or(debridTorrent.Progress, 100))
progress = progress / 100.0
if debridTorrent.Status != "downloaded" {
debridTorrent, _ = q.debrid.GetTorrent(t.ID)
}
sizeCompleted := totalSize * progress
if t.TorrentPath == "" {
t.TorrentPath = filepath.Base(debridTorrent.GetMountFolder(rcLoneMount))
}
totalSize := float64(cmp.Or(debridTorrent.Bytes, 1.0))
progress := cmp.Or(debridTorrent.Progress, 100.0)
progress = progress / 100.0
var sizeCompleted int64
sizeCompleted = int64(totalSize * progress)
savePath := filepath.Join(q.DownloadFolder, t.Category) + string(os.PathSeparator)
torrentPath := filepath.Join(savePath, debridTorrent.Folder) + string(os.PathSeparator)
torrentPath := filepath.Join(savePath, t.TorrentPath) + string(os.PathSeparator)
var speed int64
if debridTorrent.Speed != 0 {
speed = int64(debridTorrent.Speed)
speed = debridTorrent.Speed
}
var eta int64
if speed != 0 {
eta = (totalSize - sizeCompleted) / speed
eta = int64((totalSize - float64(sizeCompleted)) / float64(speed))
}
t.Name = debridTorrent.Name
t.Size = debridTorrent.Bytes
t.DebridTorrent = debridTorrent
t.Completed = sizeCompleted
t.Downloaded = sizeCompleted
t.DownloadedSession = sizeCompleted
t.Uploaded = sizeCompleted
t.UploadedSession = sizeCompleted
t.AmountLeft = totalSize - sizeCompleted
t.AmountLeft = int64(totalSize) - sizeCompleted
t.Progress = float32(progress)
t.SavePath = savePath
t.ContentPath = torrentPath
@@ -56,12 +66,25 @@ func (q *QBit) UpdateTorrent(t *Torrent, debridTorrent *debrid.Torrent) *Torrent
t.Dlspeed = speed
t.Upspeed = speed
if t.AmountLeft == 0 {
if t.IsReady() {
t.State = "pausedUP"
q.storage.AddOrUpdate(t)
return t
}
ticker := time.NewTicker(3 * time.Second)
for {
select {
case <-ticker.C:
if t.IsReady() {
t.State = "pausedUP"
q.storage.AddOrUpdate(t)
ticker.Stop()
return t
} else {
return q.UpdateTorrent(t, debridTorrent)
}
}
}
go q.storage.AddOrUpdate(t)
return t
}
func (q *QBit) ResumeTorrent(t *Torrent) bool {

View File

@@ -1,24 +1,22 @@
package qbit
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"goBlack/common"
"goBlack/pkg/debrid"
"net/http"
"os"
"path/filepath"
"sync"
"time"
)
func generateSID() (string, error) {
bytes := make([]byte, sidLength)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes), nil
}
//func generateSID() (string, error) {
// bytes := make([]byte, sidLength)
// if _, err := rand.Read(bytes); err != nil {
// return "", err
// }
// return hex.EncodeToString(bytes), nil
//}
func JSONResponse(w http.ResponseWriter, data interface{}, code int) {
w.Header().Set("Content-Type", "application/json")
@@ -26,11 +24,6 @@ func JSONResponse(w http.ResponseWriter, data interface{}, code int) {
json.NewEncoder(w).Encode(data)
}
func fileReady(path string) bool {
_, err := os.Stat(path)
return !os.IsNotExist(err) // Returns true if the file exists
}
func checkFileLoop(wg *sync.WaitGroup, dir string, file debrid.TorrentFile, ready chan<- debrid.TorrentFile) {
defer wg.Done()
ticker := time.NewTicker(1 * time.Second) // Check every second
@@ -39,7 +32,7 @@ func checkFileLoop(wg *sync.WaitGroup, dir string, file debrid.TorrentFile, read
for {
select {
case <-ticker.C:
if fileReady(path) {
if common.FileReady(path) {
ready <- file
return
}

41
pkg/qbit/worker.go Normal file
View File

@@ -0,0 +1,41 @@
package qbit
import (
"context"
"goBlack/pkg/debrid"
"time"
)
func (q *QBit) StartWorker(ctx context.Context) {
q.logger.Println("Qbit Worker started")
q.StartRefreshWorker(ctx)
}
func (q *QBit) StartRefreshWorker(ctx context.Context) {
refreshCtx := context.WithValue(ctx, "worker", "refresh")
refreshTicker := time.NewTicker(time.Duration(q.RefreshInterval) * time.Second)
for {
select {
case <-refreshCtx.Done():
q.logger.Println("Qbit Refresh Worker stopped")
return
case <-refreshTicker.C:
q.RefreshArrs()
}
}
}
func (q *QBit) RefreshArrs() {
torrents := q.storage.GetAll("", "", nil)
if len(torrents) == 0 {
return
}
for host, token := range q.arrs {
arr := &debrid.Arr{
Name: "",
Token: token,
Host: host,
}
q.RefreshArr(arr)
}
}