Optimize caching, speed up imports
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -35,12 +36,6 @@ const (
|
||||
WebdavUseHash WebDavFolderNaming = "infohash"
|
||||
)
|
||||
|
||||
type PropfindResponse struct {
|
||||
Data []byte
|
||||
GzippedData []byte
|
||||
Ts time.Time
|
||||
}
|
||||
|
||||
type CachedTorrent struct {
|
||||
*types.Torrent
|
||||
AddedOn time.Time `json:"added_on"`
|
||||
@@ -79,7 +74,7 @@ type Cache struct {
|
||||
listings atomic.Value
|
||||
downloadLinks *xsync.MapOf[string, downloadLinkCache]
|
||||
invalidDownloadLinks *xsync.MapOf[string, string]
|
||||
PropfindResp *xsync.MapOf[string, PropfindResponse]
|
||||
PropfindResp *PropfindCache
|
||||
folderNaming WebDavFolderNaming
|
||||
|
||||
// monitors
|
||||
@@ -120,13 +115,13 @@ func New(dc config.Debrid, client types.Client) *Cache {
|
||||
torrents: xsync.NewMapOf[string, string](),
|
||||
torrentsNames: xsync.NewMapOf[string, *CachedTorrent](),
|
||||
invalidDownloadLinks: xsync.NewMapOf[string, string](),
|
||||
PropfindResp: NewPropfindCache(),
|
||||
client: client,
|
||||
logger: logger.New(fmt.Sprintf("%s-webdav", client.GetName())),
|
||||
workers: dc.Workers,
|
||||
downloadLinks: xsync.NewMapOf[string, downloadLinkCache](),
|
||||
torrentRefreshInterval: dc.TorrentsRefreshInterval,
|
||||
downloadLinksRefreshInterval: dc.DownloadLinksRefreshInterval,
|
||||
PropfindResp: xsync.NewMapOf[string, PropfindResponse](),
|
||||
folderNaming: WebDavFolderNaming(dc.FolderNaming),
|
||||
autoExpiresLinksAfterDuration: autoExpiresLinksAfter,
|
||||
saveSemaphore: make(chan struct{}, 50),
|
||||
@@ -240,7 +235,7 @@ func (c *Cache) load() (map[string]*CachedTorrent, error) {
|
||||
}
|
||||
ct.IsComplete = true
|
||||
ct.Files = fs
|
||||
ct.Name = filepath.Clean(ct.Name)
|
||||
ct.Name = path.Clean(ct.Name)
|
||||
results.Store(ct.Id, &ct)
|
||||
}
|
||||
}
|
||||
@@ -315,7 +310,9 @@ func (c *Cache) Sync() error {
|
||||
}
|
||||
|
||||
// Write these torrents to the cache
|
||||
c.setTorrents(cachedTorrents)
|
||||
c.setTorrents(cachedTorrents, func() {
|
||||
go c.RefreshListings(false)
|
||||
}) // This is set to false, cos it's likely rclone hs not started yet.
|
||||
c.logger.Info().Msgf("Loaded %d torrents from cache", len(cachedTorrents))
|
||||
|
||||
if len(newTorrents) > 0 {
|
||||
@@ -393,23 +390,23 @@ func (c *Cache) sync(torrents []*types.Torrent) error {
|
||||
func (c *Cache) GetTorrentFolder(torrent *types.Torrent) string {
|
||||
switch c.folderNaming {
|
||||
case WebDavUseFileName:
|
||||
return filepath.Clean(torrent.Filename)
|
||||
return path.Clean(torrent.Filename)
|
||||
case WebDavUseOriginalName:
|
||||
return filepath.Clean(torrent.OriginalFilename)
|
||||
return path.Clean(torrent.OriginalFilename)
|
||||
case WebDavUseFileNameNoExt:
|
||||
return filepath.Clean(utils.RemoveExtension(torrent.Filename))
|
||||
return path.Clean(utils.RemoveExtension(torrent.Filename))
|
||||
case WebDavUseOriginalNameNoExt:
|
||||
return filepath.Clean(utils.RemoveExtension(torrent.OriginalFilename))
|
||||
return path.Clean(utils.RemoveExtension(torrent.OriginalFilename))
|
||||
case WebDavUseID:
|
||||
return torrent.Id
|
||||
case WebdavUseHash:
|
||||
return strings.ToLower(torrent.InfoHash)
|
||||
default:
|
||||
return filepath.Clean(torrent.Filename)
|
||||
return path.Clean(torrent.Filename)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) setTorrent(t *CachedTorrent) {
|
||||
func (c *Cache) setTorrent(t *CachedTorrent, callback func(torrent *CachedTorrent)) {
|
||||
torrentKey := c.GetTorrentFolder(t.Torrent)
|
||||
c.torrents.Store(t.Id, torrentKey) // Store the torrent id with the folder name(we might change the id after, hence why it's stored here)
|
||||
if o, ok := c.torrentsNames.Load(torrentKey); ok && o.Id != t.Id {
|
||||
@@ -426,9 +423,12 @@ func (c *Cache) setTorrent(t *CachedTorrent) {
|
||||
}
|
||||
c.torrentsNames.Store(torrentKey, t)
|
||||
c.SaveTorrent(t)
|
||||
if callback != nil {
|
||||
callback(t)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) setTorrents(torrents map[string]*CachedTorrent) {
|
||||
func (c *Cache) setTorrents(torrents map[string]*CachedTorrent, callback func()) {
|
||||
for _, t := range torrents {
|
||||
torrentKey := c.GetTorrentFolder(t.Torrent)
|
||||
c.torrents.Store(t.Id, torrentKey)
|
||||
@@ -442,8 +442,10 @@ func (c *Cache) setTorrents(torrents map[string]*CachedTorrent) {
|
||||
}
|
||||
c.torrentsNames.Store(torrentKey, t)
|
||||
}
|
||||
c.RefreshListings(false)
|
||||
c.SaveTorrents()
|
||||
if callback != nil {
|
||||
callback()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) GetListing() []os.FileInfo {
|
||||
@@ -595,7 +597,7 @@ func (c *Cache) ProcessTorrent(t *types.Torrent) error {
|
||||
IsComplete: len(t.Files) > 0,
|
||||
AddedOn: addedOn,
|
||||
}
|
||||
c.setTorrent(ct)
|
||||
c.setTorrent(ct, nil)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -615,8 +617,9 @@ func (c *Cache) AddTorrent(t *types.Torrent) error {
|
||||
IsComplete: len(t.Files) > 0,
|
||||
AddedOn: addedOn,
|
||||
}
|
||||
c.setTorrent(ct)
|
||||
c.RefreshListings(true)
|
||||
c.setTorrent(ct, func(tor *CachedTorrent) {
|
||||
go c.RefreshListings(true)
|
||||
})
|
||||
go c.GenerateDownloadLinks(ct)
|
||||
return nil
|
||||
|
||||
@@ -686,7 +689,7 @@ func (c *Cache) deleteTorrent(id string, removeFromDebrid bool) bool {
|
||||
t.Files = newFiles
|
||||
newId = cmp.Or(newId, t.Id)
|
||||
t.Id = newId
|
||||
c.setTorrent(t)
|
||||
c.setTorrent(t, nil)
|
||||
}
|
||||
}
|
||||
return true
|
||||
|
||||
59
pkg/debrid/debrid/propfind.go
Normal file
59
pkg/debrid/debrid/propfind.go
Normal file
@@ -0,0 +1,59 @@
|
||||
package debrid
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type PropfindResponse struct {
|
||||
Data []byte
|
||||
GzippedData []byte
|
||||
Ts time.Time
|
||||
}
|
||||
|
||||
type PropfindCache struct {
|
||||
sync.RWMutex
|
||||
data map[string]PropfindResponse
|
||||
}
|
||||
|
||||
func NewPropfindCache() *PropfindCache {
|
||||
return &PropfindCache{
|
||||
data: make(map[string]PropfindResponse),
|
||||
}
|
||||
}
|
||||
|
||||
func generateCacheKey(urlPath string) string {
|
||||
cleanPath := path.Clean(urlPath)
|
||||
|
||||
// Create a more collision-resistant key by hashing
|
||||
h := sha256.New()
|
||||
h.Write([]byte(fmt.Sprintf("propfind:%s", cleanPath)))
|
||||
return hex.EncodeToString(h.Sum(nil))
|
||||
}
|
||||
|
||||
func (c *PropfindCache) Get(url string) (PropfindResponse, bool) {
|
||||
key := generateCacheKey(url)
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
val, exists := c.data[key]
|
||||
return val, exists
|
||||
}
|
||||
|
||||
// Set stores an item in the cache
|
||||
func (c *PropfindCache) Set(url string, value PropfindResponse) {
|
||||
key := generateCacheKey(url)
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.data[key] = value
|
||||
}
|
||||
|
||||
func (c *PropfindCache) Remove(urlPath string) {
|
||||
key := generateCacheKey(urlPath)
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
delete(c.data, key)
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package debrid
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/sirrobot01/decypharr/internal/config"
|
||||
"github.com/sirrobot01/decypharr/internal/utils"
|
||||
@@ -64,7 +65,7 @@ func (c *Cache) RefreshListings(refreshRclone bool) {
|
||||
}
|
||||
|
||||
if refreshRclone {
|
||||
if err := c.RefreshRclone(); err != nil {
|
||||
if err := c.refreshRclone(); err != nil {
|
||||
c.logger.Trace().Err(err).Msg("Failed to refresh rclone") // silent error
|
||||
}
|
||||
}
|
||||
@@ -148,59 +149,69 @@ func (c *Cache) refreshTorrents() {
|
||||
c.logger.Debug().Msgf("Processed %d new torrents", counter)
|
||||
}
|
||||
|
||||
func (c *Cache) RefreshRclone() error {
|
||||
func (c *Cache) refreshRclone() error {
|
||||
cfg := config.Get().WebDav
|
||||
|
||||
if cfg.RcUrl == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if cfg.RcUrl == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create an optimized HTTP client
|
||||
client := &http.Client{
|
||||
Timeout: 5 * time.Second,
|
||||
Transport: &http.Transport{
|
||||
MaxIdleConns: 10,
|
||||
IdleConnTimeout: 30 * time.Second,
|
||||
DisableCompression: false,
|
||||
MaxIdleConnsPerHost: 5,
|
||||
},
|
||||
}
|
||||
// Create form data
|
||||
data := "dir=__all__&dir2=torrents"
|
||||
|
||||
// Create a POST request with form URL-encoded content
|
||||
forgetReq, err := http.NewRequest("POST", fmt.Sprintf("%s/vfs/forget", cfg.RcUrl), strings.NewReader(data))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if cfg.RcUser != "" && cfg.RcPass != "" {
|
||||
forgetReq.SetBasicAuth(cfg.RcUser, cfg.RcPass)
|
||||
sendRequest := func(endpoint string) error {
|
||||
req, err := http.NewRequest("POST", fmt.Sprintf("%s/%s", cfg.RcUrl, endpoint), strings.NewReader(data))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||
|
||||
if cfg.RcUser != "" && cfg.RcPass != "" {
|
||||
req.SetBasicAuth(cfg.RcUser, cfg.RcPass)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
req = req.WithContext(ctx)
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
// Only read a limited amount of the body on error
|
||||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
|
||||
return fmt.Errorf("failed to perform %s: %s - %s", endpoint, resp.Status, string(body))
|
||||
}
|
||||
|
||||
// Discard response body to reuse connection
|
||||
_, _ = io.Copy(io.Discard, resp.Body)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Set the appropriate content type for form data
|
||||
forgetReq.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||
|
||||
// Send the request
|
||||
client := &http.Client{}
|
||||
client.Timeout = 10 * time.Second
|
||||
forgetResp, err := client.Do(forgetReq)
|
||||
if err != nil {
|
||||
if err := sendRequest("vfs/forget"); err != nil {
|
||||
return err
|
||||
}
|
||||
defer forgetResp.Body.Close()
|
||||
|
||||
if forgetResp.StatusCode != 200 {
|
||||
body, _ := io.ReadAll(forgetResp.Body)
|
||||
return fmt.Errorf("failed to forget rclone: %s - %s", forgetResp.Status, string(body))
|
||||
}
|
||||
|
||||
// Run vfs/refresh
|
||||
refreshReq, err := http.NewRequest("POST", fmt.Sprintf("%s/vfs/refresh", cfg.RcUrl), strings.NewReader(data))
|
||||
if err != nil {
|
||||
if err := sendRequest("vfs/refresh"); err != nil {
|
||||
return err
|
||||
}
|
||||
if cfg.RcUser != "" && cfg.RcPass != "" {
|
||||
refreshReq.SetBasicAuth(cfg.RcUser, cfg.RcPass)
|
||||
}
|
||||
refreshReq.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||
refreshResp, err := client.Do(refreshReq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer refreshResp.Body.Close()
|
||||
if refreshResp.StatusCode != 200 {
|
||||
body, _ := io.ReadAll(refreshResp.Body)
|
||||
return fmt.Errorf("failed to refresh rclone: %s - %s", refreshResp.Status, string(body))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -220,7 +231,9 @@ func (c *Cache) refreshTorrent(torrentId string) *CachedTorrent {
|
||||
AddedOn: addedOn,
|
||||
IsComplete: len(torrent.Files) > 0,
|
||||
}
|
||||
c.setTorrent(ct)
|
||||
c.setTorrent(ct, func(torrent *CachedTorrent) {
|
||||
go c.RefreshListings(false)
|
||||
})
|
||||
|
||||
return ct
|
||||
}
|
||||
|
||||
@@ -200,8 +200,9 @@ func (c *Cache) reInsertTorrent(ct *CachedTorrent) (*CachedTorrent, error) {
|
||||
AddedOn: addedOn,
|
||||
IsComplete: len(newTorrent.Files) > 0,
|
||||
}
|
||||
c.setTorrent(ct)
|
||||
c.RefreshListings(true)
|
||||
c.setTorrent(ct, func(torrent *CachedTorrent) {
|
||||
go c.RefreshListings(true)
|
||||
})
|
||||
|
||||
// We can safely delete the old torrent here
|
||||
if oldID != "" {
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"github.com/sirrobot01/decypharr/internal/request"
|
||||
"net/http"
|
||||
"os"
|
||||
path "path/filepath"
|
||||
"path"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -60,17 +60,12 @@ func (c *Cache) refreshFolderXml(torrents []os.FileInfo, clientName, parent stri
|
||||
return fmt.Errorf("failed to generate XML: %v", err)
|
||||
}
|
||||
|
||||
// Store in cache
|
||||
key0 := fmt.Sprintf("propfind:%s:0", baseUrl)
|
||||
key1 := fmt.Sprintf("propfind:%s:1", baseUrl)
|
||||
|
||||
res := PropfindResponse{
|
||||
Data: xmlData,
|
||||
GzippedData: request.Gzip(xmlData),
|
||||
Ts: time.Now(),
|
||||
}
|
||||
c.PropfindResp.Store(key0, res)
|
||||
c.PropfindResp.Store(key1, res)
|
||||
c.PropfindResp.Set(baseUrl, res)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -52,9 +52,9 @@ func (h *Handler) RemoveAll(ctx context.Context, name string) error {
|
||||
if name[0] != '/' {
|
||||
name = "/" + name
|
||||
}
|
||||
name = filepath.Clean(name)
|
||||
name = path.Clean(name)
|
||||
|
||||
rootDir := filepath.Clean(h.getRootPath())
|
||||
rootDir := path.Clean(h.getRootPath())
|
||||
|
||||
if name == rootDir {
|
||||
return os.ErrPermission
|
||||
@@ -112,8 +112,8 @@ func (h *Handler) OpenFile(ctx context.Context, name string, flag int, perm os.F
|
||||
if name[0] != '/' {
|
||||
name = "/" + name
|
||||
}
|
||||
name = utils.UnescapePath(filepath.Clean(name))
|
||||
rootDir := filepath.Clean(h.getRootPath())
|
||||
name = utils.UnescapePath(path.Clean(name))
|
||||
rootDir := path.Clean(h.getRootPath())
|
||||
|
||||
metadataOnly := ctx.Value("metadataOnly") != nil
|
||||
|
||||
@@ -257,7 +257,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// Set metadata only
|
||||
ctx := context.WithValue(r.Context(), "metadataOnly", true)
|
||||
r = r.WithContext(ctx)
|
||||
cleanPath := filepath.Clean(r.URL.Path)
|
||||
cleanPath := path.Clean(r.URL.Path)
|
||||
if r.Header.Get("Depth") == "" {
|
||||
r.Header.Set("Depth", "1")
|
||||
}
|
||||
@@ -266,21 +266,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Header.Get("Depth") == "infinity" {
|
||||
r.Header.Set("Depth", "1")
|
||||
}
|
||||
depth := r.Header.Get("Depth")
|
||||
// Use both path and Depth header to form the cache key.
|
||||
cacheKey := fmt.Sprintf("propfind:%s:%s", cleanPath, depth)
|
||||
|
||||
// Determine TTL based on the requested folder:
|
||||
// - If the path is exactly the parent folder (which changes frequently),
|
||||
// use a short TTL.
|
||||
// - Otherwise, for deeper (torrent folder) paths, use a longer TTL.
|
||||
ttl := 1 * time.Minute
|
||||
if h.isParentPath(r.URL.Path) {
|
||||
// __all__ or torrents folder
|
||||
ttl = 30 * time.Second
|
||||
}
|
||||
|
||||
if served := h.serveFromCacheIfValid(w, r, cacheKey, ttl); served {
|
||||
if served := h.serveFromCacheIfValid(w, r, cleanPath); served {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -301,7 +288,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Create compressed version
|
||||
|
||||
h.cache.PropfindResp.Store(cacheKey, debrid.PropfindResponse{
|
||||
h.cache.PropfindResp.Set(cleanPath, debrid.PropfindResponse{
|
||||
Data: responseData,
|
||||
GzippedData: gzippedData,
|
||||
Ts: time.Now(),
|
||||
@@ -451,25 +438,27 @@ func getContentType(fileName string) string {
|
||||
return contentType
|
||||
}
|
||||
|
||||
func (h *Handler) isParentPath(_path string) bool {
|
||||
rootPath := h.getRootPath()
|
||||
func (h *Handler) isParentPath(urlPath string) bool {
|
||||
parents := h.getParentItems()
|
||||
lastComponent := path.Base(urlPath)
|
||||
for _, p := range parents {
|
||||
if filepath.Clean(_path) == filepath.Clean(filepath.Join(rootPath, p)) {
|
||||
if p == lastComponent {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (h *Handler) serveFromCacheIfValid(w http.ResponseWriter, r *http.Request, cacheKey string, ttl time.Duration) bool {
|
||||
respCache, ok := h.cache.PropfindResp.Load(cacheKey)
|
||||
func (h *Handler) serveFromCacheIfValid(w http.ResponseWriter, r *http.Request, urlPath string) bool {
|
||||
respCache, ok := h.cache.PropfindResp.Get(urlPath)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
ttl := h.getCacheTTL(urlPath)
|
||||
|
||||
if time.Since(respCache.Ts) >= ttl {
|
||||
// Remove expired cache entry
|
||||
h.cache.PropfindResp.Remove(urlPath)
|
||||
return false
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/xml; charset=utf-8")
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// getName: Returns the torrent name and filename from the path
|
||||
@@ -27,3 +28,14 @@ func isValidURL(str string) bool {
|
||||
// A valid URL should parse without error, and have a non-empty scheme and host.
|
||||
return err == nil && u.Scheme != "" && u.Host != ""
|
||||
}
|
||||
|
||||
// Determine TTL based on the requested folder:
|
||||
// - If the path is exactly the parent folder (which changes frequently),
|
||||
// use a short TTL.
|
||||
// - Otherwise, for deeper (torrent folder) paths, use a longer TTL.
|
||||
func (h *Handler) getCacheTTL(urlPath string) time.Duration {
|
||||
if h.isParentPath(urlPath) {
|
||||
return 30 * time.Second // Short TTL for parent folders
|
||||
}
|
||||
return 2 * time.Minute // Longer TTL for other paths
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user