Experimental usability stage

This commit is contained in:
Mukhtar Akere
2025-03-22 00:17:07 +01:00
parent d10b679584
commit 738474be16
14 changed files with 212 additions and 148 deletions

View File

@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"github.com/goccy/go-json"
"github.com/puzpuzpuz/xsync/v3"
"github.com/rs/zerolog"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"github.com/sirrobot01/debrid-blackhole/internal/utils"
@@ -44,16 +45,17 @@ type Cache struct {
torrentsNames map[string]*CachedTorrent // key: torrent.Name, value: torrent
listings atomic.Value
downloadLinks map[string]string // key: file.Link, value: download link
PropfindResp sync.Map
PropfindResp *xsync.MapOf[string, PropfindResponse]
workers int
LastUpdated time.Time `json:"last_updated"`
// config
workers int
torrentRefreshInterval time.Duration
downloadLinksRefreshInterval time.Duration
// refresh mutex
listingRefreshMu sync.Mutex // for refreshing torrents
downloadLinksRefreshMu sync.Mutex // for refreshing download links
torrentsRefreshMu sync.Mutex // for refreshing torrents
listingRefreshMu sync.RWMutex // for refreshing torrents
downloadLinksRefreshMu sync.RWMutex // for refreshing download links
torrentsRefreshMu sync.RWMutex // for refreshing torrents
// Data Mutexes
torrentsMutex sync.RWMutex // for torrents and torrentsNames
@@ -81,7 +83,7 @@ func (c *Cache) setTorrent(t *CachedTorrent) {
c.torrentsNames[t.Name] = t
c.torrentsMutex.Unlock()
tryLock(&c.listingRefreshMu, c.refreshListings)
c.refreshListings()
go func() {
if err := c.SaveTorrent(t); err != nil {
@@ -106,7 +108,7 @@ func (c *Cache) setTorrents(torrents map[string]*CachedTorrent) {
c.torrentsMutex.Unlock()
tryLock(&c.listingRefreshMu, c.refreshListings)
c.refreshListings()
go func() {
if err := c.SaveTorrents(); err != nil {
@@ -131,17 +133,27 @@ func (c *Cache) GetTorrentNames() map[string]*CachedTorrent {
return c.torrentsNames
}
func NewCache(client types.Client) *Cache {
func NewCache(dc config.Debrid, client types.Client) *Cache {
cfg := config.GetConfig()
dbPath := filepath.Join(cfg.Path, "cache", client.GetName())
torrentRefreshInterval, err := time.ParseDuration(dc.TorrentRefreshInterval)
if err != nil {
torrentRefreshInterval = time.Second * 15
}
downloadLinksRefreshInterval, err := time.ParseDuration(dc.DownloadLinksRefreshInterval)
if err != nil {
downloadLinksRefreshInterval = time.Minute * 40
}
return &Cache{
dir: dbPath,
torrents: make(map[string]*CachedTorrent),
torrentsNames: make(map[string]*CachedTorrent),
client: client,
logger: logger.NewLogger(fmt.Sprintf("%s-cache", client.GetName())),
workers: 200,
downloadLinks: make(map[string]string),
dir: filepath.Join(cfg.Path, "cache", dc.Name), // path to save cache files
torrents: make(map[string]*CachedTorrent),
torrentsNames: make(map[string]*CachedTorrent),
client: client,
logger: logger.NewLogger(fmt.Sprintf("%s-cache", client.GetName())),
workers: 200,
downloadLinks: make(map[string]string),
torrentRefreshInterval: torrentRefreshInterval,
downloadLinksRefreshInterval: downloadLinksRefreshInterval,
PropfindResp: xsync.NewMapOf[string, PropfindResponse](),
}
}
@@ -160,7 +172,7 @@ func (c *Cache) Start() error {
c.downloadLinksRefreshMu.Lock()
defer c.downloadLinksRefreshMu.Unlock()
// This prevents the download links from being refreshed twice
tryLock(&c.downloadLinksRefreshMu, c.refreshDownloadLinks)
c.refreshDownloadLinks()
}()
go func() {
@@ -462,7 +474,19 @@ func (c *Cache) GetClient() types.Client {
return c.client
}
func (c *Cache) DeleteTorrent(ids []string) {
func (c *Cache) DeleteTorrent(id string) {
c.logger.Info().Msgf("Deleting torrent %s", id)
c.torrentsMutex.Lock()
defer c.torrentsMutex.Unlock()
if t, ok := c.torrents[id]; ok {
delete(c.torrents, id)
delete(c.torrentsNames, t.Name)
c.removeFromDB(id)
}
}
func (c *Cache) DeleteTorrents(ids []string) {
c.logger.Info().Msgf("Deleting %d torrents", len(ids))
c.torrentsMutex.Lock()
defer c.torrentsMutex.Unlock()
@@ -483,6 +507,6 @@ func (c *Cache) removeFromDB(torrentId string) {
}
func (c *Cache) OnRemove(torrentId string) {
go c.DeleteTorrent([]string{torrentId})
go tryLock(&c.listingRefreshMu, c.refreshListings)
go c.DeleteTorrent(torrentId)
go c.refreshListings()
}

View File

@@ -18,11 +18,16 @@ func NewEngine() *Engine {
caches := make(map[string]*Cache)
for _, dc := range cfg.Debrids {
dc = cfg.GetDebridWebDav(dc)
client := createDebridClient(dc)
logger := client.GetLogger()
logger.Info().Msg("Debrid Service started")
if dc.UseWebdav {
caches[dc.Name] = NewCache(dc, client)
logger.Info().Msg("Debrid Service started with WebDAV")
} else {
logger.Info().Msg("Debrid Service started")
}
clients[dc.Name] = client
caches[dc.Name] = NewCache(client)
}
d := &Engine{

View File

@@ -1,10 +0,0 @@
package debrid
import "sync"
func tryLock(mu *sync.Mutex, f func()) {
if mu.TryLock() {
defer mu.Unlock()
f()
}
}

View File

@@ -1,18 +1,26 @@
package debrid
import (
"bytes"
"fmt"
"github.com/goccy/go-json"
"github.com/sirrobot01/debrid-blackhole/internal/config"
"github.com/sirrobot01/debrid-blackhole/internal/request"
"github.com/sirrobot01/debrid-blackhole/pkg/debrid/types"
"io"
"net/http"
"os"
"slices"
"sort"
"strings"
"sync"
"time"
)
func (c *Cache) refreshListings() {
if c.listingRefreshMu.TryLock() {
defer c.listingRefreshMu.Unlock()
} else {
return
}
// Copy the current torrents to avoid concurrent issues
c.torrentsMutex.RLock()
torrents := make([]string, 0, len(c.torrents))
@@ -47,6 +55,11 @@ func (c *Cache) refreshListings() {
}
func (c *Cache) refreshTorrents() {
if c.torrentsRefreshMu.TryLock() {
defer c.torrentsRefreshMu.Unlock()
} else {
return
}
c.torrentsMutex.RLock()
currentTorrents := c.torrents //
// Create a copy of the current torrents to avoid concurrent issues
@@ -69,12 +82,12 @@ func (c *Cache) refreshTorrents() {
}
// Get the newly added torrents only
newTorrents := make([]*types.Torrent, 0)
_newTorrents := make([]*types.Torrent, 0)
idStore := make(map[string]bool, len(debTorrents))
for _, t := range debTorrents {
idStore[t.Id] = true
if _, ok := torrents[t.Id]; !ok {
newTorrents = append(newTorrents, t)
_newTorrents = append(_newTorrents, t)
}
}
@@ -85,9 +98,15 @@ func (c *Cache) refreshTorrents() {
deletedTorrents = append(deletedTorrents, id)
}
}
newTorrents := make([]*types.Torrent, 0)
for _, t := range _newTorrents {
if !slices.Contains(deletedTorrents, t.Id) {
_newTorrents = append(_newTorrents, t)
}
}
if len(deletedTorrents) > 0 {
c.DeleteTorrent(deletedTorrents)
c.DeleteTorrents(deletedTorrents)
}
if len(newTorrents) == 0 {
@@ -112,34 +131,37 @@ func (c *Cache) refreshTorrents() {
}
func (c *Cache) RefreshRclone() error {
params := map[string]interface{}{
"recursive": "false",
}
client := request.Default()
cfg := config.GetConfig().WebDav
// Convert parameters to JSON
jsonParams, err := json.Marshal(params)
if cfg.RcUrl == "" {
return nil
}
// Create form data
data := "dir=__all__&dir2=torrents"
// Create a POST request with form URL-encoded content
forgetReq, err := http.NewRequest("POST", fmt.Sprintf("%s/vfs/forget", cfg.RcUrl), strings.NewReader(data))
if err != nil {
return err
}
// Create HTTP request
url := "http://192.168.0.219:9990/vfs/refresh" // Switch to config
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonParams))
if err != nil {
return err
if cfg.RcUser != "" && cfg.RcPass != "" {
forgetReq.SetBasicAuth(cfg.RcUser, cfg.RcPass)
}
// Set the appropriate headers
req.Header.Set("Content-Type", "application/json")
// Set the appropriate content type for form data
forgetReq.Header.Set("Content-Type", "application/x-www-form-urlencoded")
// Send the request
client := &http.Client{}
resp, err := client.Do(req)
forgetResp, err := client.Do(forgetReq)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("failed to refresh rclone: %s", resp.Status)
defer forgetResp.Body.Close()
if forgetResp.StatusCode != 200 {
body, _ := io.ReadAll(forgetResp.Body)
return fmt.Errorf("failed to forget rclone: %s - %s", forgetResp.Status, string(body))
}
return nil
}
@@ -166,6 +188,11 @@ func (c *Cache) refreshTorrent(t *CachedTorrent) *CachedTorrent {
}
func (c *Cache) refreshDownloadLinks() {
if c.downloadLinksRefreshMu.TryLock() {
defer c.downloadLinksRefreshMu.Unlock()
} else {
return
}
c.downloadLinksMutex.Lock()
defer c.downloadLinksMutex.Unlock()

View File

@@ -11,25 +11,25 @@ func (c *Cache) Refresh() error {
}
func (c *Cache) refreshDownloadLinksWorker() {
refreshTicker := time.NewTicker(40 * time.Minute)
refreshTicker := time.NewTicker(c.downloadLinksRefreshInterval)
defer refreshTicker.Stop()
for {
select {
case <-refreshTicker.C:
tryLock(&c.downloadLinksRefreshMu, c.refreshDownloadLinks)
c.refreshDownloadLinks()
}
}
}
func (c *Cache) refreshTorrentsWorker() {
refreshTicker := time.NewTicker(5 * time.Second)
refreshTicker := time.NewTicker(c.torrentRefreshInterval)
defer refreshTicker.Stop()
for {
select {
case <-refreshTicker.C:
tryLock(&c.torrentsRefreshMu, c.refreshTorrents)
c.refreshTorrents()
}
}
}

View File

@@ -6,14 +6,16 @@ import (
"github.com/sirrobot01/debrid-blackhole/internal/request"
"net/http"
"net/url"
"os"
path "path/filepath"
"time"
)
func (c *Cache) RefreshXml() error {
parents := []string{"__all__", "torrents"}
torrents := c.GetListing()
for _, parent := range parents {
if err := c.refreshParentXml(parent); err != nil {
if err := c.refreshParentXml(torrents, parent); err != nil {
return fmt.Errorf("failed to refresh XML for %s: %v", parent, err)
}
}
@@ -22,7 +24,7 @@ func (c *Cache) RefreshXml() error {
return nil
}
func (c *Cache) refreshParentXml(parent string) error {
func (c *Cache) refreshParentXml(torrents []os.FileInfo, parent string) error {
// Define the WebDAV namespace
davNS := "DAV:"
@@ -37,20 +39,21 @@ func (c *Cache) refreshParentXml(parent string) error {
currentTime := time.Now().UTC().Format(http.TimeFormat)
// Add the parent directory
parentPath := fmt.Sprintf("/webdav/%s/%s/", c.client.GetName(), parent)
baseUrl := path.Clean(fmt.Sprintf("/webdav/%s/%s", c.client.GetName(), parent))
parentPath := fmt.Sprintf("%s/", baseUrl)
addDirectoryResponse(multistatus, parentPath, parent, currentTime)
// Add torrents to the XML
torrents := c.GetListing()
for _, torrent := range torrents {
torrentName := torrent.Name()
name := torrent.Name()
// Note the path structure change - parent first, then torrent name
torrentPath := fmt.Sprintf("/webdav/%s/%s/%s/",
c.client.GetName(),
url.PathEscape(torrentName),
parent,
url.PathEscape(name),
)
addDirectoryResponse(multistatus, torrentPath, torrentName, currentTime)
addDirectoryResponse(multistatus, torrentPath, name, currentTime)
}
// Convert to XML string
@@ -60,8 +63,6 @@ func (c *Cache) refreshParentXml(parent string) error {
}
// Store in cache
// Construct the keys
baseUrl := path.Clean(fmt.Sprintf("/webdav/%s/%s", c.client.GetName()))
key0 := fmt.Sprintf("propfind:%s:0", baseUrl)
key1 := fmt.Sprintf("propfind:%s:1", baseUrl)
@@ -78,7 +79,7 @@ func (c *Cache) refreshParentXml(parent string) error {
func addDirectoryResponse(multistatus *etree.Element, href, displayName, modTime string) *etree.Element {
responseElem := multistatus.CreateElement("D:response")
// Add href
// Add href - ensure it's properly formatted
hrefElem := responseElem.CreateElement("D:href")
hrefElem.SetText(href)
@@ -100,6 +101,14 @@ func addDirectoryResponse(multistatus *etree.Element, href, displayName, modTime
lastModElem := propElem.CreateElement("D:getlastmodified")
lastModElem.SetText(modTime)
// Add content type for directories
contentTypeElem := propElem.CreateElement("D:getcontenttype")
contentTypeElem.SetText("httpd/unix-directory")
// Add length (size) - directories typically have zero size
contentLengthElem := propElem.CreateElement("D:getcontentlength")
contentLengthElem.SetText("0")
// Add supported lock
lockElem := propElem.CreateElement("D:supportedlock")
lockEntryElem := lockElem.CreateElement("D:lockentry")