From 5aa1c67544a0f6e0bcfc1a994ba100b36186e3a3 Mon Sep 17 00:00:00 2001 From: Mukhtar Akere Date: Tue, 20 May 2025 12:57:27 +0100 Subject: [PATCH] - Add PROPFIND for root path - Reduce signifcantly memoery footprint - Fix minor bugs --- cmd/decypharr/main.go | 190 ++++++++++++-------------- cmd/healthcheck/main.go | 2 +- go.mod | 1 - go.sum | 2 - internal/config/config.go | 2 +- internal/request/discord.go | 2 +- internal/request/request.go | 32 ++++- internal/utils/debouncer.go | 10 ++ internal/utils/scheduler.go | 18 ++- pkg/arr/arr.go | 22 ++- pkg/arr/content.go | 2 +- pkg/arr/history.go | 2 +- pkg/arr/import.go | 2 +- pkg/debrid/alldebrid/alldebrid.go | 2 +- pkg/debrid/debrid/cache.go | 120 +++++++++++----- pkg/debrid/debrid/download_link.go | 10 +- pkg/debrid/debrid/engine.go | 18 ++- pkg/debrid/debrid/refresh.go | 51 +++---- pkg/debrid/debrid/repair.go | 6 +- pkg/debrid/debrid/torrent.go | 28 ++-- pkg/debrid/debrid/worker.go | 50 +++++-- pkg/debrid/debrid_link/debrid_link.go | 2 +- pkg/debrid/realdebrid/realdebrid.go | 2 +- pkg/debrid/realdebrid/types.go | 2 +- pkg/debrid/torbox/torbox.go | 2 +- pkg/qbit/qbit.go | 8 ++ pkg/qbit/storage.go | 8 +- pkg/repair/repair.go | 116 ++++++++++------ pkg/server/server.go | 24 ++-- pkg/server/webhook.go | 2 +- pkg/service/service.go | 11 ++ pkg/web/api.go | 5 +- pkg/web/templates/layout.html | 25 +--- pkg/web/ui.go | 2 +- pkg/webdav/misc.go | 113 +++++++++++++++ pkg/webdav/webdav.go | 73 ++++++---- 36 files changed, 632 insertions(+), 335 deletions(-) diff --git a/cmd/decypharr/main.go b/cmd/decypharr/main.go index f7b8579..3e243e2 100644 --- a/cmd/decypharr/main.go +++ b/cmd/decypharr/main.go @@ -14,10 +14,10 @@ import ( "github.com/sirrobot01/decypharr/pkg/worker" "net/http" "os" + "runtime" "runtime/debug" "strconv" "sync" - "time" ) func Start(ctx context.Context) error { @@ -30,116 +30,90 @@ func Start(ctx context.Context) error { SetUmask(int(umask)) } - appCtx := ctx - - // Service context - can be cancelled and recreated for restarts - svcCtx, cancelSvc := context.WithCancel(context.Background()) - - // Create a channel to listen for restart signals restartCh := make(chan struct{}, 1) - - // Create a function to expose for requesting restarts - RequestRestart := func() { + web.SetRestartFunc(func() { select { case restartCh <- struct{}{}: - // Signal sent successfully default: - // Channel is full, ignore + } + }) + + svcCtx, cancelSvc := context.WithCancel(ctx) + defer cancelSvc() + + for { + cfg := config.Get() + _log := logger.Default() + + // ascii banner + fmt.Printf(` ++-------------------------------------------------------+ +| | +| ╔╦╗╔═╗╔═╗╦ ╦╔═╗╦ ╦╔═╗╦═╗╦═╗ | +| ║║║╣ ║ └┬┘╠═╝╠═╣╠═╣╠╦╝╠╦╝ (%s) | +| ═╩╝╚═╝╚═╝ ┴ ╩ ╩ ╩╩ ╩╩╚═╩╚═ | +| | ++-------------------------------------------------------+ +| Log Level: %s | ++-------------------------------------------------------+ +`, version.GetInfo(), cfg.LogLevel) + + // Initialize services + qb := qbit.New() + wd := webdav.New() + + ui := web.New(qb).Routes() + webdavRoutes := wd.Routes() + qbitRoutes := qb.Routes() + + // Register routes + handlers := map[string]http.Handler{ + "/": ui, + "/api/v2": qbitRoutes, + "/webdav": webdavRoutes, + } + srv := server.New(handlers) + + done := make(chan struct{}) + go func(ctx context.Context) { + if err := startServices(ctx, wd, srv); err != nil { + _log.Error().Err(err).Msg("Error starting services") + cancelSvc() + } + close(done) + }(svcCtx) + + select { + case <-ctx.Done(): + // graceful shutdown + cancelSvc() // propagate to services + <-done // wait for them to finish + return nil + + case <-restartCh: + cancelSvc() // tell existing services to shut down + _log.Info().Msg("Restarting Decypharr...") + <-done // wait for them to finish + qb.Reset() + service.Reset() + + // rebuild svcCtx off the original parent + svcCtx, cancelSvc = context.WithCancel(ctx) + runtime.GC() + + config.Reload() + service.Reset() + // loop will restart services automatically } } - - web.SetRestartFunc(RequestRestart) - - go func() { - for { - select { - case <-appCtx.Done(): - // Parent context is done, exit the loop and shut down all services - cancelSvc() - return - case <-restartCh: - _log := logger.Default() - _log.Info().Msg("Restarting services with new config...") - - // Cancel current service context to shut down all services - cancelSvc() - - // Wait a moment for services to shut down - time.Sleep(500 * time.Millisecond) - - // Create a new service context - svcCtx, cancelSvc = context.WithCancel(context.Background()) - - // Reload configuration - config.Reload() - service.Reset() - - // Start services again with new context - go func() { - err := startServices(svcCtx) - if err != nil { - _log.Error().Err(err).Msg("Error restarting services") - } - }() - - _log.Info().Msg("Services restarted successfully") - } - } - }() - - go func() { - err := startServices(svcCtx) - if err != nil { - _log := logger.Default() - _log.Error().Err(err).Msg("Error starting services") - } - }() - - // Start services for the first time - <-appCtx.Done() - - // Clean up - cancelSvc() - return nil } -func startServices(ctx context.Context) error { - cfg := config.Get() +func startServices(ctx context.Context, wd *webdav.WebDav, srv *server.Server) error { var wg sync.WaitGroup errChan := make(chan error) _log := logger.Default() - asciiArt := ` -+-------------------------------------------------------+ -| | -| ╔╦╗╔═╗╔═╗╦ ╦╔═╗╦ ╦╔═╗╦═╗╦═╗ | -| ║║║╣ ║ └┬┘╠═╝╠═╣╠═╣╠╦╝╠╦╝ (%s) | -| ═╩╝╚═╝╚═╝ ┴ ╩ ╩ ╩╩ ╩╩╚═╩╚═ | -| | -+-------------------------------------------------------+ -| Log Level: %s | -+-------------------------------------------------------+ -` - - fmt.Printf(asciiArt, version.GetInfo(), cfg.LogLevel) - - svc := service.GetService() - _qbit := qbit.New() - _webdav := webdav.New() - - ui := web.New(_qbit).Routes() - webdavRoutes := _webdav.Routes() - qbitRoutes := _qbit.Routes() - - // Register routes - handlers := map[string]http.Handler{ - "/": ui, - "/api/v2": qbitRoutes, - "/webdav": webdavRoutes, - } - srv := server.New(handlers) - safeGo := func(f func() error) { wg.Add(1) go func() { @@ -164,7 +138,7 @@ func startServices(ctx context.Context) error { } safeGo(func() error { - return _webdav.Start(ctx) + return wd.Start(ctx) }) safeGo(func() error { @@ -176,16 +150,22 @@ func startServices(ctx context.Context) error { }) safeGo(func() error { - return svc.Arr.StartSchedule(ctx) + arr := service.GetService().Arr + if arr == nil { + return nil + } + return arr.StartSchedule(ctx) }) - if cfg.Repair.Enabled { + if cfg := config.Get(); cfg.Repair.Enabled { safeGo(func() error { - err := svc.Repair.Start(ctx) - if err != nil { - _log.Error().Err(err).Msg("Error starting repair") + r := service.GetService().Repair + if r != nil { + if err := r.Start(ctx); err != nil { + _log.Error().Err(err).Msg("repair failed") + } } - return nil // Not propagating repair errors to terminate the app + return nil }) } diff --git a/cmd/healthcheck/main.go b/cmd/healthcheck/main.go index 4a3ab9d..d5d740b 100644 --- a/cmd/healthcheck/main.go +++ b/cmd/healthcheck/main.go @@ -17,7 +17,7 @@ import ( type HealthStatus struct { QbitAPI bool `json:"qbit_api"` WebUI bool `json:"web_ui"` - WebDAVService bool `json:"webdav_service,omitempty"` + WebDAVService bool `json:"webdav_service"` OverallStatus bool `json:"overall_status"` } diff --git a/go.mod b/go.mod index 906ef23..e4c7fa9 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/cavaliergopher/grab/v3 v3.0.1 github.com/go-chi/chi/v5 v5.1.0 github.com/go-co-op/gocron/v2 v2.16.1 - github.com/goccy/go-json v0.10.5 github.com/google/uuid v1.6.0 github.com/gorilla/sessions v1.4.0 github.com/robfig/cron/v3 v3.0.1 diff --git a/go.sum b/go.sum index ed372cb..d764339 100644 --- a/go.sum +++ b/go.sum @@ -77,8 +77,6 @@ github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= -github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= diff --git a/internal/config/config.go b/internal/config/config.go index 20409b1..6f434ea 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -2,9 +2,9 @@ package config import ( "cmp" + "encoding/json" "errors" "fmt" - "github.com/goccy/go-json" "os" "path/filepath" "runtime" diff --git a/internal/request/discord.go b/internal/request/discord.go index 4eeaadf..7e618ff 100644 --- a/internal/request/discord.go +++ b/internal/request/discord.go @@ -2,8 +2,8 @@ package request import ( "bytes" + "encoding/json" "fmt" - "github.com/goccy/go-json" "github.com/sirrobot01/decypharr/internal/config" "io" "net/http" diff --git a/internal/request/request.go b/internal/request/request.go index 04ae865..a6acd40 100644 --- a/internal/request/request.go +++ b/internal/request/request.go @@ -5,8 +5,9 @@ import ( "compress/gzip" "context" "crypto/tls" + "encoding/json" + "errors" "fmt" - "github.com/goccy/go-json" "github.com/rs/zerolog" "github.com/sirrobot01/decypharr/internal/logger" "golang.org/x/net/proxy" @@ -180,7 +181,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { resp, err = c.doRequest(req) if err != nil { // Check if this is a network error that might be worth retrying - if attempt < c.maxRetries { + if isRetryableError(err) && attempt < c.maxRetries { // Apply backoff with jitter jitter := time.Duration(rand.Int63n(int64(backoff / 4))) sleepTime := backoff + jitter @@ -413,3 +414,30 @@ func Default() *Client { }) return instance } + +func isRetryableError(err error) bool { + errString := err.Error() + + // Connection reset and other network errors + if strings.Contains(errString, "connection reset by peer") || + strings.Contains(errString, "read: connection reset") || + strings.Contains(errString, "connection refused") || + strings.Contains(errString, "network is unreachable") || + strings.Contains(errString, "connection timed out") || + strings.Contains(errString, "no such host") || + strings.Contains(errString, "i/o timeout") || + strings.Contains(errString, "unexpected EOF") || + strings.Contains(errString, "TLS handshake timeout") { + return true + } + + // Check for net.Error type which can provide more information + var netErr net.Error + if errors.As(err, &netErr) { + // Retry on timeout errors and temporary errors + return netErr.Timeout() || netErr.Temporary() + } + + // Not a retryable error + return false +} diff --git a/internal/utils/debouncer.go b/internal/utils/debouncer.go index 9d4ecf3..e8244bf 100644 --- a/internal/utils/debouncer.go +++ b/internal/utils/debouncer.go @@ -31,3 +31,13 @@ func (d *Debouncer[T]) Call(arg T) { d.caller(arg) }) } + +func (d *Debouncer[T]) Stop() { + d.mu.Lock() + defer d.mu.Unlock() + + if d.timer != nil { + d.timer.Stop() + d.timer = nil + } +} diff --git a/internal/utils/scheduler.go b/internal/utils/scheduler.go index a2c5e24..c166d07 100644 --- a/internal/utils/scheduler.go +++ b/internal/utils/scheduler.go @@ -10,29 +10,27 @@ import ( "time" ) -func ScheduleJob(ctx context.Context, interval string, loc *time.Location, jobFunc func()) (gocron.Job, error) { +func ScheduleJob(ctx context.Context, interval string, loc *time.Location, jobFunc func()) (gocron.Scheduler, error) { if loc == nil { loc = time.Local } - var job gocron.Job s, err := gocron.NewScheduler(gocron.WithLocation(loc)) if err != nil { - return job, fmt.Errorf("failed to create scheduler: %w", err) + return s, fmt.Errorf("failed to create scheduler: %w", err) } - jd, err := convertToJD(interval) + jd, err := ConvertToJobDef(interval) if err != nil { - return job, fmt.Errorf("failed to convert interval to job definition: %w", err) + return s, fmt.Errorf("failed to convert interval to job definition: %w", err) } // Schedule the job - if job, err = s.NewJob(jd, gocron.NewTask(jobFunc), gocron.WithContext(ctx)); err != nil { - return job, fmt.Errorf("failed to create job: %w", err) + if _, err = s.NewJob(jd, gocron.NewTask(jobFunc), gocron.WithContext(ctx)); err != nil { + return s, fmt.Errorf("failed to create job: %w", err) } - s.Start() - return job, nil + return s, nil } // ConvertToJobDef converts a string interval to a gocron.JobDefinition. -func convertToJD(interval string) (gocron.JobDefinition, error) { +func ConvertToJobDef(interval string) (gocron.JobDefinition, error) { // Parse the interval string // Interval could be in the format "1h", "30m", "15s" or "1h30m" or "04:05" var jd gocron.JobDefinition diff --git a/pkg/arr/arr.go b/pkg/arr/arr.go index f6efc81..c592194 100644 --- a/pkg/arr/arr.go +++ b/pkg/arr/arr.go @@ -3,13 +3,12 @@ package arr import ( "bytes" "context" + "encoding/json" "fmt" - "github.com/goccy/go-json" "github.com/rs/zerolog" "github.com/sirrobot01/decypharr/internal/config" "github.com/sirrobot01/decypharr/internal/logger" "github.com/sirrobot01/decypharr/internal/request" - "github.com/sirrobot01/decypharr/internal/utils" "io" "net/http" "strconv" @@ -122,6 +121,12 @@ type Storage struct { logger zerolog.Logger } +func (as *Storage) Cleanup() { + as.mu.Lock() + defer as.mu.Unlock() + as.Arrs = make(map[string]*Arr) +} + func InferType(host, name string) Type { switch { case strings.Contains(host, "sonarr") || strings.Contains(name, "sonarr"): @@ -183,10 +188,15 @@ func (as *Storage) Clear() { } func (as *Storage) StartSchedule(ctx context.Context) error { - // Schedule the cleanup job every 10 seconds - _, err := utils.ScheduleJob(ctx, "10s", nil, as.cleanupArrsQueue) - if err != nil { - return err + + ticker := time.NewTicker(10 * time.Second) + + select { + case <-ticker.C: + as.cleanupArrsQueue() + case <-ctx.Done(): + ticker.Stop() + return nil } return nil } diff --git a/pkg/arr/content.go b/pkg/arr/content.go index 1a0315d..dc4a45e 100644 --- a/pkg/arr/content.go +++ b/pkg/arr/content.go @@ -2,8 +2,8 @@ package arr import ( "context" + "encoding/json" "fmt" - "github.com/goccy/go-json" "golang.org/x/sync/errgroup" "net/http" "strconv" diff --git a/pkg/arr/history.go b/pkg/arr/history.go index 0892ede..202bed4 100644 --- a/pkg/arr/history.go +++ b/pkg/arr/history.go @@ -1,7 +1,7 @@ package arr import ( - "github.com/goccy/go-json" + "encoding/json" "io" "net/http" gourl "net/url" diff --git a/pkg/arr/import.go b/pkg/arr/import.go index 97a107f..9ef651b 100644 --- a/pkg/arr/import.go +++ b/pkg/arr/import.go @@ -1,8 +1,8 @@ package arr import ( + "encoding/json" "fmt" - "github.com/goccy/go-json" "io" "net/http" gourl "net/url" diff --git a/pkg/debrid/alldebrid/alldebrid.go b/pkg/debrid/alldebrid/alldebrid.go index ed89816..f1aa816 100644 --- a/pkg/debrid/alldebrid/alldebrid.go +++ b/pkg/debrid/alldebrid/alldebrid.go @@ -1,8 +1,8 @@ package alldebrid import ( + "encoding/json" "fmt" - "github.com/goccy/go-json" "github.com/rs/zerolog" "github.com/sirrobot01/decypharr/internal/config" "github.com/sirrobot01/decypharr/internal/logger" diff --git a/pkg/debrid/debrid/cache.go b/pkg/debrid/debrid/cache.go index 4d702f3..fafe6d5 100644 --- a/pkg/debrid/debrid/cache.go +++ b/pkg/debrid/debrid/cache.go @@ -16,8 +16,8 @@ import ( "sync/atomic" "time" + "encoding/json" "github.com/go-co-op/gocron/v2" - "github.com/goccy/go-json" "github.com/rs/zerolog" "github.com/sirrobot01/decypharr/internal/config" "github.com/sirrobot01/decypharr/internal/logger" @@ -85,6 +85,9 @@ type Cache struct { // repair repairChan chan RepairRequest + // readiness + ready chan struct{} + // config workers int torrentRefreshInterval string @@ -95,10 +98,10 @@ type Cache struct { downloadLinksRefreshMu sync.RWMutex // for refreshing download links torrentsRefreshMu sync.RWMutex // for refreshing torrents - scheduler gocron.Scheduler + scheduler gocron.Scheduler + cetScheduler gocron.Scheduler saveSemaphore chan struct{} - ctx context.Context config config.Debrid customFolders []string @@ -107,7 +110,8 @@ type Cache struct { func New(dc config.Debrid, client types.Client) *Cache { cfg := config.Get() cet, _ := time.LoadLocation("CET") - s, _ := gocron.NewScheduler(gocron.WithLocation(cet)) + cetSc, _ := gocron.NewScheduler(gocron.WithLocation(cet)) + scheduler, _ := gocron.NewScheduler(gocron.WithLocation(time.Local)) autoExpiresLinksAfter, err := time.ParseDuration(dc.AutoExpireLinksAfter) if autoExpiresLinksAfter == 0 || err != nil { @@ -145,54 +149,111 @@ func New(dc config.Debrid, client types.Client) *Cache { folderNaming: WebDavFolderNaming(dc.FolderNaming), autoExpiresLinksAfterDuration: autoExpiresLinksAfter, saveSemaphore: make(chan struct{}, 50), - ctx: context.Background(), - scheduler: s, + cetScheduler: cetSc, + scheduler: scheduler, config: dc, customFolders: customFolders, + + ready: make(chan struct{}), } + c.listingDebouncer = utils.NewDebouncer[bool](100*time.Millisecond, func(refreshRclone bool) { c.RefreshListings(refreshRclone) }) return c } +func (c *Cache) IsReady() chan struct{} { + return c.ready +} + +// Reset clears all internal state so the Cache can be reused without leaks. +// Call this after stopping the old Cache (so no goroutines are holding references), +// and before you discard the instance on a restart. +func (c *Cache) Reset() { + + if err := c.scheduler.StopJobs(); err != nil { + c.logger.Error().Err(err).Msg("Failed to stop scheduler jobs") + } + + if err := c.scheduler.Shutdown(); err != nil { + c.logger.Error().Err(err).Msg("Failed to stop scheduler") + } + + // Stop the listing debouncer + c.listingDebouncer.Stop() + + // Close the repair channel + close(c.repairChan) + + // 1. Reset torrent storage + c.torrents.reset() + + // 2. Reset download-link cache + c.downloadLinks.reset() + + // 3. Clear any sync.Maps + c.invalidDownloadLinks = sync.Map{} + c.repairRequest = sync.Map{} + c.failedToReinsert = sync.Map{} + c.downloadLinkRequests = sync.Map{} + + // 5. Rebuild the listing debouncer + c.listingDebouncer = utils.NewDebouncer[bool]( + 100*time.Millisecond, + func(refreshRclone bool) { + c.RefreshListings(refreshRclone) + }, + ) + + // 6. Reset repair channel so the next Start() can spin it up + c.repairChan = make(chan RepairRequest, 100) +} + func (c *Cache) Start(ctx context.Context) error { if err := os.MkdirAll(c.dir, 0755); err != nil { return fmt.Errorf("failed to create cache directory: %w", err) } - c.ctx = ctx - if err := c.Sync(); err != nil { + if err := c.Sync(ctx); err != nil { return fmt.Errorf("failed to sync cache: %w", err) } // initial download links - go func() { - c.refreshDownloadLinks() - }() + go c.refreshDownloadLinks(ctx) - if err := c.StartSchedule(); err != nil { + if err := c.StartSchedule(ctx); err != nil { c.logger.Error().Err(err).Msg("Failed to start cache worker") } c.repairChan = make(chan RepairRequest, 100) - go c.repairWorker() + go c.repairWorker(ctx) + + // Fire the ready channel + close(c.ready) + cfg := config.Get() + name := c.client.GetName() + addr := cfg.BindAddress + ":" + cfg.Port + cfg.URLBase + "webdav/" + name + "/" + c.logger.Info().Msgf("%s WebDav server running at %s", name, addr) + + <-ctx.Done() + c.logger.Info().Msgf("Stopping %s WebDav server", name) + c.Reset() return nil } -func (c *Cache) load() (map[string]CachedTorrent, error) { - torrents := make(map[string]CachedTorrent) +func (c *Cache) load(ctx context.Context) (map[string]CachedTorrent, error) { mu := sync.Mutex{} if err := os.MkdirAll(c.dir, 0755); err != nil { - return torrents, fmt.Errorf("failed to create cache directory: %w", err) + return nil, fmt.Errorf("failed to create cache directory: %w", err) } files, err := os.ReadDir(c.dir) if err != nil { - return torrents, fmt.Errorf("failed to read cache directory: %w", err) + return nil, fmt.Errorf("failed to read cache directory: %w", err) } // Get only json files @@ -204,7 +265,7 @@ func (c *Cache) load() (map[string]CachedTorrent, error) { } if len(jsonFiles) == 0 { - return torrents, nil + return nil, nil } // Create channels with appropriate buffering @@ -213,6 +274,8 @@ func (c *Cache) load() (map[string]CachedTorrent, error) { // Create a wait group for workers var wg sync.WaitGroup + torrents := make(map[string]CachedTorrent, len(jsonFiles)) + // Start workers for i := 0; i < c.workers; i++ { wg.Add(1) @@ -272,7 +335,7 @@ func (c *Cache) load() (map[string]CachedTorrent, error) { // Feed work to workers for _, file := range jsonFiles { select { - case <-c.ctx.Done(): + case <-ctx.Done(): break // Context cancelled default: workChan <- file @@ -288,13 +351,8 @@ func (c *Cache) load() (map[string]CachedTorrent, error) { return torrents, nil } -func (c *Cache) Sync() error { - cfg := config.Get() - name := c.client.GetName() - addr := cfg.BindAddress + ":" + cfg.Port + cfg.URLBase + "webdav/" + name + "/" - - defer c.logger.Info().Msgf("%s WebDav server running at %s", name, addr) - cachedTorrents, err := c.load() +func (c *Cache) Sync(ctx context.Context) error { + cachedTorrents, err := c.load(ctx) if err != nil { c.logger.Error().Err(err).Msg("Failed to load cache") } @@ -342,7 +400,7 @@ func (c *Cache) Sync() error { if len(newTorrents) > 0 { c.logger.Info().Msgf("Found %d new torrents", len(newTorrents)) - if err := c.sync(newTorrents); err != nil { + if err := c.sync(ctx, newTorrents); err != nil { return fmt.Errorf("failed to sync torrents: %v", err) } } @@ -350,7 +408,7 @@ func (c *Cache) Sync() error { return nil } -func (c *Cache) sync(torrents []*types.Torrent) error { +func (c *Cache) sync(ctx context.Context, torrents []*types.Torrent) error { // Create channels with appropriate buffering workChan := make(chan *types.Torrent, min(c.workers, len(torrents))) @@ -384,7 +442,7 @@ func (c *Cache) sync(torrents []*types.Torrent) error { c.logger.Info().Msgf("Progress: %d/%d torrents processed", count, len(torrents)) } - case <-c.ctx.Done(): + case <-ctx.Done(): return // Context cancelled, exit goroutine } } @@ -396,7 +454,7 @@ func (c *Cache) sync(torrents []*types.Torrent) error { select { case workChan <- t: // Work sent successfully - case <-c.ctx.Done(): + case <-ctx.Done(): break // Context cancelled } } @@ -443,7 +501,7 @@ func (c *Cache) setTorrent(t CachedTorrent, callback func(torrent CachedTorrent) updatedTorrent.Files = mergedFiles } c.torrents.set(torrentName, t, updatedTorrent) - c.SaveTorrent(updatedTorrent) + c.SaveTorrent(t) if callback != nil { callback(updatedTorrent) } diff --git a/pkg/debrid/debrid/download_link.go b/pkg/debrid/debrid/download_link.go index 869a411..9499dcb 100644 --- a/pkg/debrid/debrid/download_link.go +++ b/pkg/debrid/debrid/download_link.go @@ -26,6 +26,13 @@ func newDownloadLinkCache() *downloadLinkCache { data: make(map[string]linkCache), } } + +func (c *downloadLinkCache) reset() { + c.mu.Lock() + c.data = make(map[string]linkCache) + c.mu.Unlock() +} + func (c *downloadLinkCache) Load(key string) (linkCache, bool) { c.mu.Lock() defer c.mu.Unlock() @@ -155,14 +162,13 @@ func (c *Cache) fetchDownloadLink(torrentName, filename, fileLink string) (strin func (c *Cache) GenerateDownloadLinks(t CachedTorrent) { if err := c.client.GenerateDownloadLinks(t.Torrent); err != nil { - c.logger.Error().Err(err).Msg("Failed to generate download links") + c.logger.Error().Err(err).Str("torrent", t.Name).Msg("Failed to generate download links") return } for _, file := range t.Files { if file.DownloadLink != nil { c.updateDownloadLink(file.DownloadLink) } - } c.setTorrent(t, nil) } diff --git a/pkg/debrid/debrid/engine.go b/pkg/debrid/debrid/engine.go index a5e4996..b2412c5 100644 --- a/pkg/debrid/debrid/engine.go +++ b/pkg/debrid/debrid/engine.go @@ -7,11 +7,11 @@ import ( ) type Engine struct { - Clients map[string]types.Client + Clients map[string]types.Client clientsMu sync.Mutex - Caches map[string]*Cache - CacheMu sync.Mutex - LastUsed string + Caches map[string]*Cache + CacheMu sync.Mutex + LastUsed string } func NewEngine() *Engine { @@ -46,6 +46,16 @@ func (d *Engine) GetClient(name string) types.Client { return d.Clients[name] } +func (d *Engine) Reset() { + d.clientsMu.Lock() + d.Clients = make(map[string]types.Client) + d.clientsMu.Unlock() + + d.CacheMu.Lock() + d.Caches = make(map[string]*Cache) + d.CacheMu.Unlock() +} + func (d *Engine) GetDebrids() map[string]types.Client { return d.Clients } diff --git a/pkg/debrid/debrid/refresh.go b/pkg/debrid/debrid/refresh.go index af54993..cf9c728 100644 --- a/pkg/debrid/debrid/refresh.go +++ b/pkg/debrid/debrid/refresh.go @@ -1,6 +1,7 @@ package debrid import ( + "context" "fmt" "github.com/sirrobot01/decypharr/pkg/debrid/types" "io" @@ -34,25 +35,23 @@ func (c *Cache) RefreshListings(refreshRclone bool) { if refreshRclone { if err := c.refreshRclone(); err != nil { - c.logger.Trace().Err(err).Msg("Failed to refresh rclone") // silent error + c.logger.Error().Err(err).Msg("Failed to refresh rclone") // silent error } } } -func (c *Cache) refreshTorrents() { - // Use a mutex to prevent concurrent refreshes - if c.torrentsRefreshMu.TryLock() { - defer c.torrentsRefreshMu.Unlock() - } else { - return - } - +func (c *Cache) refreshTorrents(ctx context.Context) { select { - case <-c.ctx.Done(): + case <-ctx.Done(): return default: } + if !c.torrentsRefreshMu.TryLock() { + return + } + defer c.torrentsRefreshMu.Unlock() + // Get all torrents from the debrid service debTorrents, err := c.client.GetTorrents() if err != nil { @@ -72,7 +71,8 @@ func (c *Cache) refreshTorrents() { // Let's implement deleting torrents removed from debrid deletedTorrents := make([]string, 0) - for _, id := range c.torrents.getAllIDs() { + cachedTorrents := c.torrents.getIdMaps() + for id := range cachedTorrents { if _, exists := currentTorrentIds[id]; !exists { deletedTorrents = append(deletedTorrents, id) } @@ -83,9 +83,8 @@ func (c *Cache) refreshTorrents() { } newTorrents := make([]*types.Torrent, 0) - cachedIdsMaps := c.torrents.getIdMaps() for _, t := range debTorrents { - if _, exists := cachedIdsMaps[t.Id]; !exists { + if _, exists := cachedTorrents[t.Id]; !exists { newTorrents = append(newTorrents, t) } } @@ -129,12 +128,6 @@ func (c *Cache) refreshTorrents() { func (c *Cache) refreshRclone() error { cfg := c.config - select { - case <-c.ctx.Done(): - return nil - default: - } - if cfg.RcUrl == "" { return nil } @@ -214,12 +207,6 @@ func (c *Cache) refreshTorrent(torrentId string) *CachedTorrent { return nil } - select { - case <-c.ctx.Done(): - return nil - default: - } - torrent, err := c.client.GetTorrent(torrentId) if err != nil { c.logger.Error().Err(err).Msgf("Failed to get torrent %s", torrentId) @@ -241,19 +228,19 @@ func (c *Cache) refreshTorrent(torrentId string) *CachedTorrent { return &ct } -func (c *Cache) refreshDownloadLinks() { - if c.downloadLinksRefreshMu.TryLock() { - defer c.downloadLinksRefreshMu.Unlock() - } else { - return - } +func (c *Cache) refreshDownloadLinks(ctx context.Context) { select { - case <-c.ctx.Done(): + case <-ctx.Done(): return default: } + if !c.downloadLinksRefreshMu.TryLock() { + return + } + defer c.downloadLinksRefreshMu.Unlock() + downloadLinks, err := c.client.GetDownloads() if err != nil { diff --git a/pkg/debrid/debrid/repair.go b/pkg/debrid/debrid/repair.go index 3151516..5a0df5c 100644 --- a/pkg/debrid/debrid/repair.go +++ b/pkg/debrid/debrid/repair.go @@ -1,6 +1,7 @@ package debrid import ( + "context" "errors" "fmt" "github.com/sirrobot01/decypharr/internal/request" @@ -122,12 +123,11 @@ func (c *Cache) IsTorrentBroken(t *CachedTorrent, filenames []string) bool { return isBroken } -func (c *Cache) repairWorker() { +func (c *Cache) repairWorker(ctx context.Context) { // This watches a channel for torrents to repair and can be cancelled via context for { select { - case <-c.ctx.Done(): - // Context was cancelled, exit the goroutine + case <-ctx.Done(): return case req, ok := <-c.repairChan: diff --git a/pkg/debrid/debrid/torrent.go b/pkg/debrid/debrid/torrent.go index 3c35765..c782765 100644 --- a/pkg/debrid/debrid/torrent.go +++ b/pkg/debrid/debrid/torrent.go @@ -73,6 +73,22 @@ func newTorrentCache(dirFilters map[string][]directoryFilter) *torrentCache { return tc } +func (tc *torrentCache) reset() { + tc.mu.Lock() + tc.byID = make(map[string]CachedTorrent) + tc.byName = make(map[string]CachedTorrent) + tc.mu.Unlock() + + // reset the sorted listing + tc.sortNeeded.Store(false) + tc.listing.Store(make([]os.FileInfo, 0)) + + // reset any per-folder views + tc.folderListingMu.Lock() + tc.folderListing = make(map[string][]os.FileInfo) + tc.folderListingMu.Unlock() +} + func (tc *torrentCache) getByID(id string) (CachedTorrent, bool) { tc.mu.Lock() defer tc.mu.Unlock() @@ -251,22 +267,12 @@ func (tc *torrentCache) getAll() map[string]CachedTorrent { tc.mu.Lock() defer tc.mu.Unlock() result := make(map[string]CachedTorrent) - for name, torrent := range tc.byName { + for name, torrent := range tc.byID { result[name] = torrent } return result } -func (tc *torrentCache) getAllIDs() []string { - tc.mu.Lock() - defer tc.mu.Unlock() - ids := make([]string, 0, len(tc.byID)) - for id := range tc.byID { - ids = append(ids, id) - } - return ids -} - func (tc *torrentCache) getIdMaps() map[string]struct{} { tc.mu.Lock() defer tc.mu.Unlock() diff --git a/pkg/debrid/debrid/worker.go b/pkg/debrid/debrid/worker.go index 8602144..b0a3705 100644 --- a/pkg/debrid/debrid/worker.go +++ b/pkg/debrid/debrid/worker.go @@ -1,32 +1,60 @@ package debrid import ( + "context" + "github.com/go-co-op/gocron/v2" "github.com/sirrobot01/decypharr/internal/utils" - "time" ) -func (c *Cache) StartSchedule() error { +func (c *Cache) StartSchedule(ctx context.Context) error { // For now, we just want to refresh the listing and download links - if _, err := utils.ScheduleJob(c.ctx, c.downloadLinksRefreshInterval, nil, c.refreshDownloadLinks); err != nil { - c.logger.Error().Err(err).Msg("Failed to add download link refresh job") + // Schedule download link refresh job + if jd, err := utils.ConvertToJobDef(c.downloadLinksRefreshInterval); err != nil { + c.logger.Error().Err(err).Msg("Failed to convert download link refresh interval to job definition") } else { - c.logger.Debug().Msgf("Download link refresh job scheduled for every %s", c.downloadLinksRefreshInterval) + // Schedule the job + if _, err := c.scheduler.NewJob(jd, gocron.NewTask(func() { + c.refreshDownloadLinks(ctx) + }), gocron.WithContext(ctx)); err != nil { + c.logger.Error().Err(err).Msg("Failed to create download link refresh job") + } else { + c.logger.Debug().Msgf("Download link refresh job scheduled for every %s", c.downloadLinksRefreshInterval) + } } - if _, err := utils.ScheduleJob(c.ctx, c.torrentRefreshInterval, nil, c.refreshTorrents); err != nil { - c.logger.Error().Err(err).Msg("Failed to add torrent refresh job") + // Schedule torrent refresh job + if jd, err := utils.ConvertToJobDef(c.torrentRefreshInterval); err != nil { + c.logger.Error().Err(err).Msg("Failed to convert torrent refresh interval to job definition") } else { - c.logger.Debug().Msgf("Torrent refresh job scheduled for every %s", c.torrentRefreshInterval) + // Schedule the job + if _, err := c.scheduler.NewJob(jd, gocron.NewTask(func() { + c.refreshTorrents(ctx) + }), gocron.WithContext(ctx)); err != nil { + c.logger.Error().Err(err).Msg("Failed to create torrent refresh job") + } else { + c.logger.Debug().Msgf("Torrent refresh job scheduled for every %s", c.torrentRefreshInterval) + } } // Schedule the reset invalid links job // This job will run every at 00:00 CET // and reset the invalid links in the cache - cet, _ := time.LoadLocation("CET") - if _, err := utils.ScheduleJob(c.ctx, "00:00", cet, c.resetInvalidLinks); err != nil { - c.logger.Error().Err(err).Msg("Failed to add reset invalid links job") + if jd, err := utils.ConvertToJobDef("00:00"); err != nil { + c.logger.Error().Err(err).Msg("Failed to convert link reset interval to job definition") + } else { + // Schedule the job + if _, err := c.cetScheduler.NewJob(jd, gocron.NewTask(func() { + c.resetInvalidLinks() + }), gocron.WithContext(ctx)); err != nil { + c.logger.Error().Err(err).Msg("Failed to create link reset job") + } else { + c.logger.Debug().Msgf("Link reset job scheduled for every midnight, CET") + } } + // Start the scheduler + c.scheduler.Start() + c.cetScheduler.Start() return nil } diff --git a/pkg/debrid/debrid_link/debrid_link.go b/pkg/debrid/debrid_link/debrid_link.go index d73545b..f547db7 100644 --- a/pkg/debrid/debrid_link/debrid_link.go +++ b/pkg/debrid/debrid_link/debrid_link.go @@ -2,8 +2,8 @@ package debrid_link import ( "bytes" + "encoding/json" "fmt" - "github.com/goccy/go-json" "github.com/rs/zerolog" "github.com/sirrobot01/decypharr/internal/config" "github.com/sirrobot01/decypharr/internal/logger" diff --git a/pkg/debrid/realdebrid/realdebrid.go b/pkg/debrid/realdebrid/realdebrid.go index 94d68d2..c5ff227 100644 --- a/pkg/debrid/realdebrid/realdebrid.go +++ b/pkg/debrid/realdebrid/realdebrid.go @@ -2,9 +2,9 @@ package realdebrid import ( "bytes" + "encoding/json" "errors" "fmt" - "github.com/goccy/go-json" "github.com/rs/zerolog" "github.com/sirrobot01/decypharr/internal/config" "github.com/sirrobot01/decypharr/internal/logger" diff --git a/pkg/debrid/realdebrid/types.go b/pkg/debrid/realdebrid/types.go index dcb789a..ab6879a 100644 --- a/pkg/debrid/realdebrid/types.go +++ b/pkg/debrid/realdebrid/types.go @@ -1,8 +1,8 @@ package realdebrid import ( + "encoding/json" "fmt" - "github.com/goccy/go-json" "time" ) diff --git a/pkg/debrid/torbox/torbox.go b/pkg/debrid/torbox/torbox.go index fdea20d..d4e0c04 100644 --- a/pkg/debrid/torbox/torbox.go +++ b/pkg/debrid/torbox/torbox.go @@ -2,8 +2,8 @@ package torbox import ( "bytes" + "encoding/json" "fmt" - "github.com/goccy/go-json" "github.com/rs/zerolog" "github.com/sirrobot01/decypharr/internal/config" "github.com/sirrobot01/decypharr/internal/logger" diff --git a/pkg/qbit/qbit.go b/pkg/qbit/qbit.go index 552ca17..732d411 100644 --- a/pkg/qbit/qbit.go +++ b/pkg/qbit/qbit.go @@ -42,3 +42,11 @@ func New() *QBit { downloadSemaphore: make(chan struct{}, cmp.Or(cfg.MaxDownloads, 5)), } } + +func (q *QBit) Reset() { + if q.Storage != nil { + q.Storage.Reset() + } + q.Tags = nil + close(q.downloadSemaphore) +} diff --git a/pkg/qbit/storage.go b/pkg/qbit/storage.go index fba068f..e2671bb 100644 --- a/pkg/qbit/storage.go +++ b/pkg/qbit/storage.go @@ -1,8 +1,8 @@ package qbit import ( + "encoding/json" "fmt" - "github.com/goccy/go-json" "github.com/sirrobot01/decypharr/pkg/service" "os" "sort" @@ -272,3 +272,9 @@ func (ts *TorrentStorage) saveToFile() error { } return os.WriteFile(ts.filename, data, 0644) } + +func (ts *TorrentStorage) Reset() { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.torrents = make(Torrents) +} diff --git a/pkg/repair/repair.go b/pkg/repair/repair.go index eb34934..f59267e 100644 --- a/pkg/repair/repair.go +++ b/pkg/repair/repair.go @@ -2,8 +2,9 @@ package repair import ( "context" + "encoding/json" "fmt" - "github.com/goccy/go-json" + "github.com/go-co-op/gocron/v2" "github.com/google/uuid" "github.com/rs/zerolog" "github.com/sirrobot01/decypharr/internal/config" @@ -38,9 +39,35 @@ type Repair struct { logger zerolog.Logger filename string workers int + scheduler gocron.Scheduler ctx context.Context } +type JobStatus string + +const ( + JobStarted JobStatus = "started" + JobPending JobStatus = "pending" + JobFailed JobStatus = "failed" + JobCompleted JobStatus = "completed" + JobProcessing JobStatus = "processing" +) + +type Job struct { + ID string `json:"id"` + Arrs []string `json:"arrs"` + MediaIDs []string `json:"media_ids"` + StartedAt time.Time `json:"created_at"` + BrokenItems map[string][]arr.ContentFile `json:"broken_items"` + Status JobStatus `json:"status"` + CompletedAt time.Time `json:"finished_at"` + FailedAt time.Time `json:"failed_at"` + AutoProcess bool `json:"auto_process"` + Recurrent bool `json:"recurrent"` + + Error string `json:"error"` +} + func New(arrs *arr.Storage, engine *debrid.Engine) *Repair { cfg := config.Get() workers := runtime.NumCPU() * 20 @@ -69,8 +96,24 @@ func New(arrs *arr.Storage, engine *debrid.Engine) *Repair { return r } +func (r *Repair) Reset() { + // Stop scheduler + if r.scheduler != nil { + if err := r.scheduler.StopJobs(); err != nil { + r.logger.Error().Err(err).Msg("Error stopping scheduler") + } + + if err := r.scheduler.Shutdown(); err != nil { + r.logger.Error().Err(err).Msg("Error shutting down scheduler") + } + } + // Reset jobs + r.Jobs = make(map[string]*Job) + +} + func (r *Repair) Start(ctx context.Context) error { - r.ctx = ctx + //r.ctx = ctx if r.runOnStart { r.logger.Info().Msgf("Running initial repair") go func() { @@ -80,47 +123,33 @@ func (r *Repair) Start(ctx context.Context) error { }() } - job, err := utils.ScheduleJob(r.ctx, r.interval, time.Local, func() { - r.logger.Info().Msgf("Repair job started at %s", time.Now().Format("15:04:05")) - if err := r.AddJob([]string{}, []string{}, r.autoProcess, true); err != nil { - r.logger.Error().Err(err).Msg("Error running repair job") + r.scheduler, _ = gocron.NewScheduler(gocron.WithLocation(time.Local)) + + if jd, err := utils.ConvertToJobDef(r.interval); err != nil { + r.logger.Error().Err(err).Str("interval", r.interval).Msg("Error converting interval") + } else { + _, err2 := r.scheduler.NewJob(jd, gocron.NewTask(func() { + r.logger.Info().Msgf("Repair job started at %s", time.Now().Format("15:04:05")) + if err := r.AddJob([]string{}, []string{}, r.autoProcess, true); err != nil { + r.logger.Error().Err(err).Msg("Error running repair job") + } + })) + if err2 != nil { + r.logger.Error().Err(err2).Msg("Error creating repair job") + } else { + r.scheduler.Start() + r.logger.Info().Msgf("Repair job scheduled every %s", r.interval) } - }) - if err != nil { - r.logger.Error().Err(err).Msg("Error scheduling repair job") - return err - } - if t, err := job.NextRun(); err == nil { - r.logger.Info().Msgf("Next repair job scheduled at %s", t.Format("15:04:05")) } + + <-ctx.Done() + + r.logger.Info().Msg("Stopping repair scheduler") + r.Reset() + return nil } -type JobStatus string - -const ( - JobStarted JobStatus = "started" - JobPending JobStatus = "pending" - JobFailed JobStatus = "failed" - JobCompleted JobStatus = "completed" - JobProcessing JobStatus = "processing" -) - -type Job struct { - ID string `json:"id"` - Arrs []string `json:"arrs"` - MediaIDs []string `json:"media_ids"` - StartedAt time.Time `json:"created_at"` - BrokenItems map[string][]arr.ContentFile `json:"broken_items"` - Status JobStatus `json:"status"` - CompletedAt time.Time `json:"finished_at"` - FailedAt time.Time `json:"failed_at"` - AutoProcess bool `json:"auto_process"` - Recurrent bool `json:"recurrent"` - - Error string `json:"error"` -} - func (j *Job) discordContext() string { format := ` **ID**: %s @@ -737,7 +766,7 @@ func (r *Repair) loadFromFile() { _jobs := make(map[string]*Job) err = json.Unmarshal(data, &_jobs) if err != nil { - r.logger.Trace().Err(err).Msg("Failed to unmarshal jobs; resetting") + r.logger.Error().Err(err).Msg("Failed to unmarshal jobs; resetting") r.Jobs = make(map[string]*Job) return } @@ -765,3 +794,12 @@ func (r *Repair) DeleteJobs(ids []string) { } go r.saveToFile() } + +// Cleanup Cleans up the repair instance +func (r *Repair) Cleanup() { + r.Jobs = make(map[string]*Job) + r.arrs = nil + r.deb = nil + r.ctx = nil + r.logger.Info().Msg("Repair stopped") +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 2dd84f5..4640ef0 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -4,20 +4,17 @@ import ( "context" "errors" "fmt" - "io" - "net/http" - "net/url" - "os" - "os/signal" - "runtime" - "syscall" - "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/rs/zerolog" "github.com/sirrobot01/decypharr/internal/config" "github.com/sirrobot01/decypharr/internal/logger" "github.com/sirrobot01/decypharr/internal/request" + "io" + "net/http" + "net/url" + "os" + "runtime" ) type Server struct { @@ -69,13 +66,9 @@ func (s *Server) Start(ctx context.Context) error { Handler: s.router, } - ctx, stop := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) - defer stop() - go func() { if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { s.logger.Info().Msgf("Error starting server: %v", err) - stop() } }() @@ -124,7 +117,7 @@ func (s *Server) getStats(w http.ResponseWriter, r *http.Request) { // Memory stats "heap_alloc_mb": fmt.Sprintf("%.2fMB", float64(memStats.HeapAlloc)/1024/1024), "total_alloc_mb": fmt.Sprintf("%.2fMB", float64(memStats.TotalAlloc)/1024/1024), - "sys_mb": fmt.Sprintf("%.2fMB", float64(memStats.Sys)/1024/1024), + "memory_used": fmt.Sprintf("%.2fMB", float64(memStats.Sys)/1024/1024), // GC stats "gc_cycles": memStats.NumGC, @@ -133,6 +126,11 @@ func (s *Server) getStats(w http.ResponseWriter, r *http.Request) { // System info "num_cpu": runtime.NumCPU(), + + // OS info + "os": runtime.GOOS, + "arch": runtime.GOARCH, + "go_version": runtime.Version(), } request.JSONResponse(w, stats, http.StatusOK) } diff --git a/pkg/server/webhook.go b/pkg/server/webhook.go index 7e5b60e..0977a56 100644 --- a/pkg/server/webhook.go +++ b/pkg/server/webhook.go @@ -2,7 +2,7 @@ package server import ( "cmp" - "github.com/goccy/go-json" + "encoding/json" "github.com/sirrobot01/decypharr/pkg/service" "net/http" ) diff --git a/pkg/service/service.go b/pkg/service/service.go index e5c1cc3..eb7378b 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -33,6 +33,17 @@ func GetService() *Service { } func Reset() { + if instance != nil { + if instance.Debrid != nil { + instance.Debrid.Reset() + } + if instance.Arr != nil { + //instance.Arr.Reset() + } + if instance.Repair != nil { + //instance.Repair.Reset() + } + } once = sync.Once{} instance = nil } diff --git a/pkg/web/api.go b/pkg/web/api.go index 66d3e1d..dbc3bbd 100644 --- a/pkg/web/api.go +++ b/pkg/web/api.go @@ -6,8 +6,8 @@ import ( "strings" "time" + "encoding/json" "github.com/go-chi/chi/v5" - "github.com/goccy/go-json" "github.com/sirrobot01/decypharr/internal/config" "github.com/sirrobot01/decypharr/internal/request" "github.com/sirrobot01/decypharr/internal/utils" @@ -35,6 +35,9 @@ func (ui *Handler) handleAddContent(w http.ResponseWriter, r *http.Request) { arrName := r.FormValue("arr") notSymlink := r.FormValue("notSymlink") == "true" downloadUncached := r.FormValue("downloadUncached") == "true" + if arrName == "" { + arrName = "uncategorized" + } _arr := svc.Arr.Get(arrName) if _arr == nil { diff --git a/pkg/web/templates/layout.html b/pkg/web/templates/layout.html index ad9d405..1da7133 100644 --- a/pkg/web/templates/layout.html +++ b/pkg/web/templates/layout.html @@ -68,17 +68,6 @@ font-weight: 500; } - .badge#channel-badge { - background-color: #0d6efd; - } - - .badge#channel-badge.beta { - background-color: #fd7e14; - } - .badge#channel-badge.nightly { - background-color: #6c757d; - } - .table { color: var(--text-color); } @@ -180,7 +169,9 @@ - Loading... + + Stats + Loading... @@ -323,22 +314,20 @@ .then(response => response.json()) .then(data => { const versionBadge = document.getElementById('version-badge'); - const channelBadge = document.getElementById('channel-badge'); // Add url to version badge - versionBadge.innerHTML = `${data.version}`; - channelBadge.textContent = data.channel.charAt(0).toUpperCase() + data.channel.slice(1); + versionBadge.innerHTML = `${data.channel}-${data.version}`; + if (data.channel === 'beta') { - channelBadge.classList.add('beta'); + versionBadge.classList.add('beta'); } else if (data.channel === 'nightly') { - channelBadge.classList.add('nightly'); + versionBadge.classList.add('nightly'); } }) .catch(error => { console.error('Error fetching version:', error); document.getElementById('version-badge').textContent = 'Unknown'; - document.getElementById('channel-badge').textContent = 'Unknown'; }); }); diff --git a/pkg/web/ui.go b/pkg/web/ui.go index a52619c..b7659a5 100644 --- a/pkg/web/ui.go +++ b/pkg/web/ui.go @@ -1,7 +1,7 @@ package web import ( - "github.com/goccy/go-json" + "encoding/json" "github.com/sirrobot01/decypharr/internal/config" "golang.org/x/crypto/bcrypt" "net/http" diff --git a/pkg/webdav/misc.go b/pkg/webdav/misc.go index 37de33f..71aacb0 100644 --- a/pkg/webdav/misc.go +++ b/pkg/webdav/misc.go @@ -1,9 +1,16 @@ package webdav import ( + "github.com/go-chi/chi/v5" + "github.com/sirrobot01/decypharr/internal/utils" + "github.com/stanNthe5/stringbuf" + "net/http" "net/url" "os" + "path" + "strconv" "strings" + "time" ) // getName: Returns the torrent name and filename from the path @@ -49,3 +56,109 @@ func fastEscapePath(p string) string { } return b.String() } + +type entry struct { + escHref string // already XML-safe + percent-escaped + escName string + size int64 + isDir bool + modTime string +} + +func filesToXML(urlPath string, fi os.FileInfo, children []os.FileInfo) stringbuf.StringBuf { + + now := time.Now().UTC().Format("2006-01-02T15:04:05.000-07:00") + entries := make([]entry, 0, len(children)+1) + + // Add the current file itself + entries = append(entries, entry{ + escHref: xmlEscape(fastEscapePath(urlPath)), + escName: xmlEscape(fi.Name()), + isDir: fi.IsDir(), + size: fi.Size(), + modTime: fi.ModTime().Format("2006-01-02T15:04:05.000-07:00"), + }) + for _, info := range children { + + nm := info.Name() + // build raw href + href := path.Join("/", urlPath, nm) + if info.IsDir() { + href += "/" + } + + entries = append(entries, entry{ + escHref: xmlEscape(fastEscapePath(href)), + escName: xmlEscape(nm), + isDir: info.IsDir(), + size: info.Size(), + modTime: info.ModTime().Format("2006-01-02T15:04:05.000-07:00"), + }) + } + + sb := builderPool.Get().(stringbuf.StringBuf) + sb.Reset() + defer builderPool.Put(sb) + + // XML header and main element + _, _ = sb.WriteString(``) + _, _ = sb.WriteString(``) + + // Add responses for each entry + for _, e := range entries { + _, _ = sb.WriteString(``) + _, _ = sb.WriteString(``) + _, _ = sb.WriteString(e.escHref) + _, _ = sb.WriteString(``) + _, _ = sb.WriteString(``) + _, _ = sb.WriteString(``) + + if e.isDir { + _, _ = sb.WriteString(``) + } else { + _, _ = sb.WriteString(``) + _, _ = sb.WriteString(``) + _, _ = sb.WriteString(strconv.FormatInt(e.size, 10)) + _, _ = sb.WriteString(``) + } + + _, _ = sb.WriteString(``) + _, _ = sb.WriteString(now) + _, _ = sb.WriteString(``) + + _, _ = sb.WriteString(``) + _, _ = sb.WriteString(e.escName) + _, _ = sb.WriteString(``) + + _, _ = sb.WriteString(``) + _, _ = sb.WriteString(`HTTP/1.1 200 OK`) + _, _ = sb.WriteString(``) + _, _ = sb.WriteString(``) + } + + // Close root element + _, _ = sb.WriteString(``) + return sb +} + +func writeXml(w http.ResponseWriter, status int, buf stringbuf.StringBuf) { + w.Header().Set("Content-Type", "application/xml; charset=utf-8") + w.WriteHeader(status) + _, _ = w.Write(buf.Bytes()) +} + +func getParam(r *http.Request, key string) string { + if r.URL == nil || r.URL.Query() == nil { + return "" + } + if v := chi.URLParam(r, key); v != "" { + return utils.PathUnescape(v) + } + if v := r.URL.Query().Get(key); v != "" { + return utils.PathUnescape(v) + } + if v := r.FormValue(key); v != "" { + return utils.PathUnescape(v) + } + return "" +} diff --git a/pkg/webdav/webdav.go b/pkg/webdav/webdav.go index 342421e..a175340 100644 --- a/pkg/webdav/webdav.go +++ b/pkg/webdav/webdav.go @@ -5,13 +5,17 @@ import ( "embed" "fmt" "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" "github.com/sirrobot01/decypharr/internal/config" "github.com/sirrobot01/decypharr/pkg/service" "html/template" "net/http" "net/url" + "os" + "path" "strings" "sync" + "time" ) //go:embed templates/* @@ -70,9 +74,18 @@ var ( tplDirectory = template.Must(template.New("").Funcs(funcMap).ParseFS(templatesFS, "templates/directory.html")) ) +func init() { + chi.RegisterMethod("PROPFIND") + chi.RegisterMethod("PROPPATCH") + chi.RegisterMethod("MKCOL") + chi.RegisterMethod("COPY") + chi.RegisterMethod("MOVE") + chi.RegisterMethod("LOCK") + chi.RegisterMethod("UNLOCK") +} + type WebDav struct { Handlers []*Handler - ready chan struct{} URLBase string } @@ -81,7 +94,6 @@ func New() *WebDav { urlBase := config.Get().URLBase w := &WebDav{ Handlers: make([]*Handler, 0), - ready: make(chan struct{}), URLBase: urlBase, } for name, c := range svc.Debrid.Caches { @@ -92,32 +104,10 @@ func New() *WebDav { } func (wd *WebDav) Routes() http.Handler { - chi.RegisterMethod("PROPFIND") - chi.RegisterMethod("PROPPATCH") - chi.RegisterMethod("MKCOL") - chi.RegisterMethod("COPY") - chi.RegisterMethod("MOVE") - chi.RegisterMethod("LOCK") - chi.RegisterMethod("UNLOCK") wr := chi.NewRouter() + wr.Use(middleware.StripSlashes) wr.Use(wd.commonMiddleware) - // Create a readiness check middleware - readinessMiddleware := func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - select { - case <-wd.ready: - // WebDAV is ready, proceed - next.ServeHTTP(w, r) - default: - // WebDAV is still initializing - w.Header().Set("Retry-After", "10") - http.Error(w, "WebDAV service is initializing, please try again shortly", http.StatusServiceUnavailable) - } - }) - } - wr.Use(readinessMiddleware) - wd.setupRootHandler(wr) wd.mountHandlers(wr) @@ -145,9 +135,6 @@ func (wd *WebDav) Start(ctx context.Context) error { go func() { wg.Wait() close(errChan) - - // Signal that WebDAV is ready - close(wd.ready) }() // Collect all errors @@ -171,7 +158,8 @@ func (wd *WebDav) mountHandlers(r chi.Router) { } func (wd *WebDav) setupRootHandler(r chi.Router) { - r.Get("/", wd.handleRoot()) + r.Get("/", wd.handleGetRoot()) + r.MethodFunc("PROPFIND", "/", wd.handleWebdavRoot()) } func (wd *WebDav) commonMiddleware(next http.Handler) http.Handler { @@ -186,7 +174,7 @@ func (wd *WebDav) commonMiddleware(next http.Handler) http.Handler { }) } -func (wd *WebDav) handleRoot() http.HandlerFunc { +func (wd *WebDav) handleGetRoot() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/html; charset=utf-8") @@ -202,3 +190,28 @@ func (wd *WebDav) handleRoot() http.HandlerFunc { } } } + +func (wd *WebDav) handleWebdavRoot() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + fi := &FileInfo{ + name: "/", + size: 0, + mode: 0755 | os.ModeDir, + modTime: time.Now(), + isDir: true, + } + children := make([]os.FileInfo, 0, len(wd.Handlers)) + for _, h := range wd.Handlers { + children = append(children, &FileInfo{ + name: h.Name, + size: 0, + mode: 0755 | os.ModeDir, + modTime: time.Now(), + isDir: true, + }) + } + sb := filesToXML(path.Clean(r.URL.Path), fi, children) + writeXml(w, http.StatusMultiStatus, sb) + + } +}