4 Commits

Author SHA1 Message Date
Mukhtar Akere
2548c21e5b Fix rclone file log
Some checks failed
GoReleaser / goreleaser (push) Has been cancelled
Release Docker Build / docker (push) Has been cancelled
2025-08-19 01:01:53 +01:00
Mukhtar Akere
1b03ccefbb Hotfix rclone logging flags 2025-08-19 00:55:43 +01:00
Mukhtar Akere
e3a249a9cc Fix issues with rclone mounting
Some checks failed
GoReleaser / goreleaser (push) Has been cancelled
Release Docker Build / docker (push) Has been cancelled
2025-08-18 22:12:26 +01:00
Mukhtar Akere
8696db42d2 - Add more rclone supports
- Add rclone log viewer
- Add more stats to Stats page
- Fix some minor bugs
2025-08-18 01:57:02 +01:00
37 changed files with 749 additions and 215 deletions

View File

@@ -42,8 +42,20 @@ LABEL org.opencontainers.image.authors = "sirrobot01"
LABEL org.opencontainers.image.documentation = "https://github.com/sirrobot01/decypharr/blob/main/README.md"
# Install dependencies including rclone
RUN apk add --no-cache fuse3 ca-certificates su-exec shadow rclone && \
echo "user_allow_other" >> /etc/fuse.conf
RUN apk add --no-cache fuse3 ca-certificates su-exec shadow curl unzip && \
echo "user_allow_other" >> /etc/fuse.conf && \
case "$(uname -m)" in \
x86_64) ARCH=amd64 ;; \
aarch64) ARCH=arm64 ;; \
armv7l) ARCH=arm ;; \
*) echo "Unsupported architecture: $(uname -m)" && exit 1 ;; \
esac && \
curl -O "https://downloads.rclone.org/rclone-current-linux-${ARCH}.zip" && \
unzip "rclone-current-linux-${ARCH}.zip" && \
cp rclone-*/rclone /usr/local/bin/ && \
chmod +x /usr/local/bin/rclone && \
rm -rf rclone-* && \
apk del curl unzip
# Copy binaries and entrypoint
COPY --from=builder /decypharr /usr/bin/decypharr

View File

@@ -40,6 +40,7 @@ func Start(ctx context.Context) error {
svcCtx, cancelSvc := context.WithCancel(ctx)
defer cancelSvc()
// Create the logger path if it doesn't exist
for {
cfg := config.Get()
_log := logger.Default()
@@ -157,14 +158,6 @@ func startServices(ctx context.Context, cancelSvc context.CancelFunc, wd *webdav
return rcManager.Start(ctx)
})
safeGo(func() error {
arr := store.Get().Arr()
if arr == nil {
return nil
}
return arr.StartSchedule(ctx)
})
if cfg := config.Get(); cfg.Repair.Enabled {
safeGo(func() error {
repair := store.Get().Repair()
@@ -178,7 +171,8 @@ func startServices(ctx context.Context, cancelSvc context.CancelFunc, wd *webdav
}
safeGo(func() error {
return store.Get().StartQueueSchedule(ctx)
store.Get().StartWorkers(ctx)
return nil
})
go func() {

View File

@@ -51,8 +51,12 @@ services:
- /dev/fuse:/dev/fuse:rwm
cap_add:
- SYS_ADMIN
security_opt:
- apparmor:unconfined
environment:
- UMASK=002
- PUID=1000 # Change to your user ID
- PGID=1000 # Change to your group ID
```
**Important Docker Notes:**

View File

@@ -32,6 +32,7 @@ type Debrid struct {
APIKey string `json:"api_key,omitempty"`
DownloadAPIKeys []string `json:"download_api_keys,omitempty"`
Folder string `json:"folder,omitempty"`
RcloneMountPath string `json:"rclone_mount_path,omitempty"` // Custom rclone mount path for this debrid service
DownloadUncached bool `json:"download_uncached,omitempty"`
CheckCached bool `json:"check_cached,omitempty"`
RateLimit string `json:"rate_limit,omitempty"` // 200/minute or 10/second
@@ -117,6 +118,8 @@ type Rclone struct {
// Performance settings
NoModTime bool `json:"no_modtime,omitempty"` // Don't read/write modification time
NoChecksum bool `json:"no_checksum,omitempty"` // Don't checksum files on upload
LogLevel string `json:"log_level,omitempty"`
}
type Config struct {
@@ -430,6 +433,7 @@ func (c *Config) setDefaults() {
c.Rclone.VfsCachePollInterval = cmp.Or(c.Rclone.VfsCachePollInterval, "1m") // Clean cache every minute
}
c.Rclone.DirCacheTime = cmp.Or(c.Rclone.DirCacheTime, "5m")
c.Rclone.LogLevel = cmp.Or(c.Rclone.LogLevel, "INFO")
}
// Load the auth file
c.Auth = c.GetAuth()

View File

@@ -26,7 +26,7 @@ func GetLogPath() string {
}
}
return filepath.Join(logsDir, "decypharr.log")
return logsDir
}
func New(prefix string) zerolog.Logger {
@@ -34,7 +34,7 @@ func New(prefix string) zerolog.Logger {
level := config.Get().LogLevel
rotatingLogFile := &lumberjack.Logger{
Filename: GetLogPath(),
Filename: filepath.Join(GetLogPath(), "decypharr.log"),
MaxSize: 10,
MaxAge: 15,
Compress: true,

View File

@@ -1,7 +1,6 @@
package utils
import (
"context"
"fmt"
"github.com/go-co-op/gocron/v2"
"github.com/robfig/cron/v3"
@@ -10,25 +9,6 @@ import (
"time"
)
func ScheduleJob(ctx context.Context, interval string, loc *time.Location, jobFunc func()) (gocron.Scheduler, error) {
if loc == nil {
loc = time.Local
}
s, err := gocron.NewScheduler(gocron.WithLocation(loc))
if err != nil {
return s, fmt.Errorf("failed to create scheduler: %w", err)
}
jd, err := ConvertToJobDef(interval)
if err != nil {
return s, fmt.Errorf("failed to convert interval to job definition: %w", err)
}
// Schedule the job
if _, err = s.NewJob(jd, gocron.NewTask(jobFunc), gocron.WithContext(ctx)); err != nil {
return s, fmt.Errorf("failed to create job: %w", err)
}
return s, nil
}
// ConvertToJobDef converts a string interval to a gocron.JobDefinition.
func ConvertToJobDef(interval string) (gocron.JobDefinition, error) {
// Parse the interval string

View File

@@ -190,7 +190,7 @@ func (s *Storage) GetAll() []*Arr {
return arrs
}
func (s *Storage) StartSchedule(ctx context.Context) error {
func (s *Storage) StartWorker(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)

View File

@@ -133,7 +133,7 @@ func (a *Arr) CleanupQueue() error {
messages := q.StatusMessages
if len(messages) > 0 {
for _, m := range messages {
if strings.Contains(strings.Join(m.Messages, " "), "No files found are eligible for import in") {
if strings.Contains(strings.Join(m.Messages, " "), "No files found are eligible") {
isMessedUp = true
break
}

View File

@@ -9,13 +9,14 @@ import (
"github.com/sirrobot01/decypharr/internal/utils"
"github.com/sirrobot01/decypharr/pkg/arr"
"github.com/sirrobot01/decypharr/pkg/debrid/providers/alldebrid"
"github.com/sirrobot01/decypharr/pkg/debrid/providers/debrid_link"
"github.com/sirrobot01/decypharr/pkg/debrid/providers/debridlink"
"github.com/sirrobot01/decypharr/pkg/debrid/providers/realdebrid"
"github.com/sirrobot01/decypharr/pkg/debrid/providers/torbox"
debridStore "github.com/sirrobot01/decypharr/pkg/debrid/store"
"github.com/sirrobot01/decypharr/pkg/debrid/types"
"github.com/sirrobot01/decypharr/pkg/rclone"
"sync"
"time"
)
type Debrid struct {
@@ -69,7 +70,7 @@ func NewStorage(rcManager *rclone.Manager) *Storage {
_log := client.Logger()
if dc.UseWebDav {
if cfg.Rclone.Enabled && rcManager != nil {
mounter = rclone.NewMount(dc.Name, webdavUrl, rcManager)
mounter = rclone.NewMount(dc.Name, dc.RcloneMountPath, webdavUrl, rcManager)
}
cache = debridStore.NewDebridCache(dc, client, mounter)
_log.Info().Msg("Debrid Service started with WebDAV")
@@ -98,6 +99,47 @@ func (d *Storage) Debrid(name string) *Debrid {
return nil
}
func (d *Storage) StartWorker(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
}
// Start all debrid syncAccounts
// Runs every 1m
if err := d.syncAccounts(); err != nil {
return err
}
ticker := time.NewTicker(5 * time.Minute)
go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_ = d.syncAccounts()
}
}
}()
return nil
}
func (d *Storage) syncAccounts() error {
d.mu.Lock()
defer d.mu.Unlock()
for name, debrid := range d.debrids {
if debrid == nil || debrid.client == nil {
continue
}
_log := debrid.client.Logger()
if err := debrid.client.SyncAccounts(); err != nil {
_log.Error().Err(err).Msgf("Failed to sync account for %s", name)
continue
}
}
return nil
}
func (d *Storage) Debrids() map[string]*Debrid {
d.mu.RLock()
defer d.mu.RUnlock()
@@ -178,7 +220,7 @@ func createDebridClient(dc config.Debrid) (types.Client, error) {
case "torbox":
return torbox.New(dc)
case "debridlink":
return debrid_link.New(dc)
return debridlink.New(dc)
case "alldebrid":
return alldebrid.New(dc)
default:

View File

@@ -25,6 +25,7 @@ type AllDebrid struct {
autoExpiresLinksAfter time.Duration
DownloadUncached bool
client *request.Client
Profile *types.Profile `json:"profile"`
MountPath string
logger zerolog.Logger
@@ -33,10 +34,6 @@ type AllDebrid struct {
minimumFreeSlot int
}
func (ad *AllDebrid) GetProfile() (*types.Profile, error) {
return nil, nil
}
func New(dc config.Debrid) (*AllDebrid, error) {
rl := request.ParseRateLimit(dc.RateLimit)
@@ -449,6 +446,58 @@ func (ad *AllDebrid) GetAvailableSlots() (int, error) {
return 0, fmt.Errorf("GetAvailableSlots not implemented for AllDebrid")
}
func (ad *AllDebrid) GetProfile() (*types.Profile, error) {
if ad.Profile != nil {
return ad.Profile, nil
}
url := fmt.Sprintf("%s/user", ad.Host)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := ad.client.MakeRequest(req)
if err != nil {
return nil, err
}
var res UserProfileResponse
err = json.Unmarshal(resp, &res)
if err != nil {
ad.logger.Error().Err(err).Msgf("Error unmarshalling user profile")
return nil, err
}
if res.Status != "success" {
message := "unknown error"
if res.Error != nil {
message = res.Error.Message
}
return nil, fmt.Errorf("error getting user profile: %s", message)
}
userData := res.Data.User
expiration := time.Unix(userData.PremiumUntil, 0)
profile := &types.Profile{
Id: 1,
Name: ad.name,
Username: userData.Username,
Email: userData.Email,
Points: userData.FidelityPoints,
Premium: userData.PremiumUntil,
Expiration: expiration,
}
if userData.IsPremium {
profile.Type = "premium"
} else if userData.IsTrial {
profile.Type = "trial"
} else {
profile.Type = "free"
}
ad.Profile = profile
return profile, nil
}
func (ad *AllDebrid) Accounts() *types.Accounts {
return ad.accounts
}
func (ad *AllDebrid) SyncAccounts() error {
return nil
}

View File

@@ -112,3 +112,22 @@ func (m *Magnets) UnmarshalJSON(data []byte) error {
}
return fmt.Errorf("magnets: unsupported JSON format")
}
type UserProfileResponse struct {
Status string `json:"status"`
Error *errorResponse `json:"error"`
Data struct {
User struct {
Username string `json:"username"`
Email string `json:"email"`
IsPremium bool `json:"isPremium"`
IsSubscribed bool `json:"isSubscribed"`
IsTrial bool `json:"isTrial"`
PremiumUntil int64 `json:"premiumUntil"`
Lang string `json:"lang"`
FidelityPoints int `json:"fidelityPoints"`
LimitedHostersQuotas map[string]int `json:"limitedHostersQuotas"`
Notifications []string `json:"notifications"`
} `json:"user"`
} `json:"data"`
}

View File

@@ -1,4 +1,4 @@
package debrid_link
package debridlink
import (
"bytes"
@@ -30,6 +30,8 @@ type DebridLink struct {
logger zerolog.Logger
checkCached bool
addSamples bool
Profile *types.Profile `json:"profile,omitempty"`
}
func New(dc config.Debrid) (*DebridLink, error) {
@@ -66,10 +68,6 @@ func New(dc config.Debrid) (*DebridLink, error) {
}, nil
}
func (dl *DebridLink) GetProfile() (*types.Profile, error) {
return nil, nil
}
func (dl *DebridLink) Name() string {
return dl.name
}
@@ -476,6 +474,55 @@ func (dl *DebridLink) GetAvailableSlots() (int, error) {
return 0, fmt.Errorf("GetAvailableSlots not implemented for DebridLink")
}
func (dl *DebridLink) GetProfile() (*types.Profile, error) {
if dl.Profile != nil {
return dl.Profile, nil
}
url := fmt.Sprintf("%s/account/infos", dl.Host)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := dl.client.MakeRequest(req)
if err != nil {
return nil, err
}
var res UserInfo
err = json.Unmarshal(resp, &res)
if err != nil {
dl.logger.Error().Err(err).Msgf("Error unmarshalling user info")
return nil, err
}
if !res.Success || res.Value == nil {
return nil, fmt.Errorf("error getting user info")
}
data := *res.Value
expiration := time.Unix(data.PremiumLeft, 0)
profile := &types.Profile{
Id: 1,
Username: data.Username,
Name: dl.name,
Email: data.Email,
Points: data.Points,
Premium: data.PremiumLeft,
Expiration: expiration,
}
if expiration.IsZero() {
profile.Expiration = time.Now().AddDate(1, 0, 0) // Default to 1 year if no expiration
}
if data.PremiumLeft > 0 {
profile.Type = "premium"
} else {
profile.Type = "free"
}
dl.Profile = profile
return profile, nil
}
func (dl *DebridLink) Accounts() *types.Accounts {
return dl.accounts
}
func (dl *DebridLink) SyncAccounts() error {
return nil
}

View File

@@ -1,4 +1,4 @@
package debrid_link
package debridlink
type APIResponse[T any] struct {
Success bool `json:"success"`
@@ -43,3 +43,12 @@ type _torrentInfo struct {
type torrentInfo APIResponse[[]_torrentInfo]
type SubmitTorrentInfo APIResponse[_torrentInfo]
type UserInfo APIResponse[struct {
Username string `json:"username"`
Email string `json:"email"`
AccountType int `json:"accountType"`
PremiumLeft int64 `json:"premiumLeft"`
Points int `json:"pts"`
Trafficshare int `json:"trafficshare"`
}]

View File

@@ -161,6 +161,23 @@ func (r *RealDebrid) getSelectedFiles(t *types.Torrent, data torrentInfo) (map[s
return files, nil
}
func (r *RealDebrid) handleRarFallback(t *types.Torrent, data torrentInfo) (map[string]types.File, error) {
files := make(map[string]types.File)
file := types.File{
TorrentId: t.Id,
Id: "0",
Name: t.Name + ".rar",
Size: data.Bytes,
IsRar: true,
ByteRange: nil,
Path: t.Name + ".rar",
Link: data.Links[0],
Generated: time.Now(),
}
files[file.Name] = file
return files, nil
}
// handleRarArchive processes RAR archives with multiple files
func (r *RealDebrid) handleRarArchive(t *types.Torrent, data torrentInfo, selectedFiles []types.File) (map[string]types.File, error) {
// This will block if 2 RAR operations are already in progress
@@ -172,21 +189,8 @@ func (r *RealDebrid) handleRarArchive(t *types.Torrent, data torrentInfo, select
files := make(map[string]types.File)
if !r.UnpackRar {
r.logger.Debug().Msgf("RAR file detected, but unpacking is disabled: %s", t.Name)
// Create a single file representing the RAR archive
file := types.File{
TorrentId: t.Id,
Id: "0",
Name: t.Name + ".rar",
Size: 0,
IsRar: true,
ByteRange: nil,
Path: t.Name + ".rar",
Link: data.Links[0],
Generated: time.Now(),
}
files[file.Name] = file
return files, nil
r.logger.Debug().Msgf("RAR file detected, but unpacking is disabled: %s. Falling back to single file representation.", t.Name)
return r.handleRarFallback(t, data)
}
r.logger.Info().Msgf("RAR file detected, unpacking: %s", t.Name)
@@ -194,20 +198,23 @@ func (r *RealDebrid) handleRarArchive(t *types.Torrent, data torrentInfo, select
downloadLinkObj, err := r.GetDownloadLink(t, linkFile)
if err != nil {
return nil, fmt.Errorf("failed to get download link for RAR file: %w", err)
r.logger.Debug().Err(err).Msgf("Error getting download link for RAR file: %s. Falling back to single file representation.", t.Name)
return r.handleRarFallback(t, data)
}
dlLink := downloadLinkObj.DownloadLink
reader, err := rar.NewReader(dlLink)
if err != nil {
return nil, fmt.Errorf("failed to create RAR reader: %w", err)
r.logger.Debug().Err(err).Msgf("Error creating RAR reader for %s. Falling back to single file representation.", t.Name)
return r.handleRarFallback(t, data)
}
rarFiles, err := reader.GetFiles()
if err != nil {
return nil, fmt.Errorf("failed to read RAR files: %w", err)
r.logger.Debug().Err(err).Msgf("Error reading RAR files for %s. Falling back to single file representation.", t.Name)
return r.handleRarFallback(t, data)
}
// Create lookup map for faster matching
@@ -232,7 +239,11 @@ func (r *RealDebrid) handleRarArchive(t *types.Torrent, data torrentInfo, select
r.logger.Warn().Msgf("RAR file %s not found in torrent files", rarFile.Name())
}
}
if len(files) == 0 {
r.logger.Warn().Msgf("No valid files found in RAR archive for torrent: %s", t.Name)
return r.handleRarFallback(t, data)
}
r.logger.Info().Msgf("Unpacked RAR archive for torrent: %s with %d files", t.Name, len(files))
return files, nil
}
@@ -700,7 +711,7 @@ func (r *RealDebrid) _getDownloadLink(file *types.File) (*types.DownloadLink, er
func (r *RealDebrid) GetDownloadLink(t *types.Torrent, file *types.File) (*types.DownloadLink, error) {
accounts := r.accounts.All()
accounts := r.accounts.Active()
for _, account := range accounts {
r.downloadClient.SetHeader("Authorization", fmt.Sprintf("Bearer %s", account.Token))
@@ -835,7 +846,7 @@ func (r *RealDebrid) GetDownloadLinks() (map[string]*types.DownloadLink, error)
offset := 0
limit := 1000
accounts := r.accounts.All()
accounts := r.accounts.Active()
if len(accounts) < 1 {
// No active download keys. It's likely that the key has reached bandwidth limit
@@ -933,6 +944,7 @@ func (r *RealDebrid) GetProfile() (*types.Profile, error) {
return nil, err
}
profile := &types.Profile{
Name: r.name,
Id: data.Id,
Username: data.Username,
Email: data.Email,
@@ -962,3 +974,71 @@ func (r *RealDebrid) GetAvailableSlots() (int, error) {
func (r *RealDebrid) Accounts() *types.Accounts {
return r.accounts
}
func (r *RealDebrid) SyncAccounts() error {
// Sync accounts with the current configuration
if len(r.accounts.Active()) == 0 {
return nil
}
for idx, account := range r.accounts.Active() {
if err := r.syncAccount(idx, account); err != nil {
r.logger.Error().Err(err).Msgf("Error syncing account %s", account.Username)
continue // Skip this account and continue with the next
}
}
return nil
}
func (r *RealDebrid) syncAccount(index int, account *types.Account) error {
if account.Token == "" {
return fmt.Errorf("account %s has no token", account.Username)
}
client := http.DefaultClient
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/user", r.Host), nil)
if err != nil {
return fmt.Errorf("error creating request for account %s: %w", account.Username, err)
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", account.Token))
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("error checking account %s: %w", account.Username, err)
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return fmt.Errorf("account %s is not valid, status code: %d", account.Username, resp.StatusCode)
}
defer resp.Body.Close()
var profile profileResponse
if err := json.NewDecoder(resp.Body).Decode(&profile); err != nil {
return fmt.Errorf("error decoding profile for account %s: %w", account.Username, err)
}
account.Username = profile.Username
// Get traffic usage
trafficReq, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/traffic/details", r.Host), nil)
if err != nil {
return fmt.Errorf("error creating request for traffic details for account %s: %w", account.Username, err)
}
trafficReq.Header.Set("Authorization", fmt.Sprintf("Bearer %s", account.Token))
trafficResp, err := client.Do(trafficReq)
if err != nil {
return fmt.Errorf("error checking traffic for account %s: %w", account.Username, err)
}
if trafficResp.StatusCode != http.StatusOK {
trafficResp.Body.Close()
return fmt.Errorf("error checking traffic for account %s, status code: %d", account.Username, trafficResp.StatusCode)
}
defer trafficResp.Body.Close()
var trafficData TrafficResponse
if err := json.NewDecoder(trafficResp.Body).Decode(&trafficData); err != nil {
return fmt.Errorf("error decoding traffic details for account %s: %w", account.Username, err)
}
today := time.Now().Format(time.DateOnly)
if todayData, exists := trafficData[today]; exists {
account.TrafficUsed = todayData.Bytes
}
r.accounts.Update(index, account)
return nil
}

View File

@@ -144,11 +144,11 @@ type profileResponse struct {
Id int64 `json:"id"`
Username string `json:"username"`
Email string `json:"email"`
Points int64 `json:"points"`
Points int `json:"points"`
Locale string `json:"locale"`
Avatar string `json:"avatar"`
Type string `json:"type"`
Premium int `json:"premium"`
Premium int64 `json:"premium"`
Expiration time.Time `json:"expiration"`
}
@@ -156,3 +156,10 @@ type AvailableSlotsResponse struct {
ActiveSlots int `json:"nb"`
TotalSlots int `json:"limit"`
}
type hostData struct {
Host map[string]int64 `json:"host"`
Bytes int64 `json:"bytes"`
}
type TrafficResponse map[string]hostData

View File

@@ -40,10 +40,6 @@ type Torbox struct {
addSamples bool
}
func (tb *Torbox) GetProfile() (*types.Profile, error) {
return nil, nil
}
func New(dc config.Debrid) (*Torbox, error) {
rl := request.ParseRateLimit(dc.RateLimit)
@@ -632,6 +628,14 @@ func (tb *Torbox) GetAvailableSlots() (int, error) {
return 0, fmt.Errorf("not implemented")
}
func (tb *Torbox) GetProfile() (*types.Profile, error) {
return nil, nil
}
func (tb *Torbox) Accounts() *types.Accounts {
return tb.accounts
}
func (tb *Torbox) SyncAccounts() error {
return nil
}

View File

@@ -122,9 +122,13 @@ func NewDebridCache(dc config.Debrid, client types.Client, mounter *rclone.Mount
cetSc, err := gocron.NewScheduler(gocron.WithLocation(cet))
if err != nil {
// If we can't create a CET scheduler, fallback to local time
cetSc, _ = gocron.NewScheduler(gocron.WithLocation(time.Local))
cetSc, _ = gocron.NewScheduler(gocron.WithLocation(time.Local), gocron.WithGlobalJobOptions(
gocron.WithTags("decypharr-"+dc.Name)))
}
scheduler, err := gocron.NewScheduler(gocron.WithLocation(time.Local))
scheduler, err := gocron.NewScheduler(
gocron.WithLocation(time.Local),
gocron.WithGlobalJobOptions(
gocron.WithTags("decypharr-"+dc.Name)))
if err != nil {
// If we can't create a local scheduler, fallback to CET
scheduler = cetSc
@@ -254,11 +258,6 @@ func (c *Cache) Start(ctx context.Context) error {
// initial download links
go c.refreshDownloadLinks(ctx)
if err := c.StartSchedule(ctx); err != nil {
c.logger.Error().Err(err).Msg("Failed to start cache worker")
}
c.repairChan = make(chan RepairRequest, 100) // Initialize the repair channel, max 100 requests buffered
go c.repairWorker(ctx)
@@ -708,7 +707,7 @@ func (c *Cache) ProcessTorrent(t *types.Torrent) error {
Str("torrent_id", t.Id).
Str("torrent_name", t.Name).
Int("total_files", len(t.Files)).
Msg("Torrent still not complete after refresh")
Msg("Torrent still not complete after refresh, marking as bad")
} else {
addedOn, err := time.Parse(time.RFC3339, t.Added)

View File

@@ -6,11 +6,11 @@ import (
"github.com/sirrobot01/decypharr/internal/utils"
)
func (c *Cache) StartSchedule(ctx context.Context) error {
func (c *Cache) StartWorker(ctx context.Context) error {
// For now, we just want to refresh the listing and download links
// Stop any existing jobs before starting new ones
c.scheduler.RemoveByTags("decypharr")
c.scheduler.RemoveByTags("decypharr-%s", c.GetConfig().Name)
// Schedule download link refresh job
if jd, err := utils.ConvertToJobDef(c.downloadLinksRefreshInterval); err != nil {
@@ -19,7 +19,7 @@ func (c *Cache) StartSchedule(ctx context.Context) error {
// Schedule the job
if _, err := c.scheduler.NewJob(jd, gocron.NewTask(func() {
c.refreshDownloadLinks(ctx)
}), gocron.WithContext(ctx), gocron.WithTags("decypharr")); err != nil {
}), gocron.WithContext(ctx)); err != nil {
c.logger.Error().Err(err).Msg("Failed to create download link refresh job")
} else {
c.logger.Debug().Msgf("Download link refresh job scheduled for every %s", c.downloadLinksRefreshInterval)
@@ -33,7 +33,7 @@ func (c *Cache) StartSchedule(ctx context.Context) error {
// Schedule the job
if _, err := c.scheduler.NewJob(jd, gocron.NewTask(func() {
c.refreshTorrents(ctx)
}), gocron.WithContext(ctx), gocron.WithTags("decypharr")); err != nil {
}), gocron.WithContext(ctx)); err != nil {
c.logger.Error().Err(err).Msg("Failed to create torrent refresh job")
} else {
c.logger.Debug().Msgf("Torrent refresh job scheduled for every %s", c.torrentRefreshInterval)
@@ -49,7 +49,7 @@ func (c *Cache) StartSchedule(ctx context.Context) error {
// Schedule the job
if _, err := c.cetScheduler.NewJob(jd, gocron.NewTask(func() {
c.resetInvalidLinks(ctx)
}), gocron.WithContext(ctx), gocron.WithTags("decypharr")); err != nil {
}), gocron.WithContext(ctx)); err != nil {
c.logger.Error().Err(err).Msg("Failed to create link reset job")
} else {
c.logger.Debug().Msgf("Link reset job scheduled for every midnight, CET")

View File

@@ -33,15 +33,17 @@ func NewAccounts(debridConf config.Debrid) *Accounts {
}
type Account struct {
Debrid string // e.g., "realdebrid", "torbox", etc.
Order int
Disabled bool
Token string
links map[string]*DownloadLink
mu sync.RWMutex
Debrid string // e.g., "realdebrid", "torbox", etc.
Order int
Disabled bool
Token string `json:"token"`
links map[string]*DownloadLink
mu sync.RWMutex
TrafficUsed int64 `json:"traffic_used"` // Traffic used in bytes
Username string `json:"username"` // Username for the account
}
func (a *Accounts) All() []*Account {
func (a *Accounts) Active() []*Account {
a.mu.RLock()
defer a.mu.RUnlock()
activeAccounts := make([]*Account, 0)
@@ -53,6 +55,12 @@ func (a *Accounts) All() []*Account {
return activeAccounts
}
func (a *Accounts) All() []*Account {
a.mu.RLock()
defer a.mu.RUnlock()
return a.accounts
}
func (a *Accounts) Current() *Account {
a.mu.RLock()
if a.current != nil {
@@ -177,6 +185,23 @@ func (a *Accounts) SetDownloadLinks(links map[string]*DownloadLink) {
a.Current().setLinks(links)
}
func (a *Accounts) Update(index int, account *Account) {
a.mu.Lock()
defer a.mu.Unlock()
if index < 0 || index >= len(a.accounts) {
return // Index out of bounds
}
// Update the account at the specified index
a.accounts[index] = account
// If the updated account is the current one, update the current reference
if a.current == nil || a.current.Order == index {
a.current = account
}
}
func newAccount(debridName, token string, index int) *Account {
return &Account{
Debrid: debridName,
@@ -213,7 +238,6 @@ func (a *Account) LinksCount() int {
defer a.mu.RUnlock()
return len(a.links)
}
func (a *Account) disable() {
a.Disabled = true
}

View File

@@ -25,4 +25,5 @@ type Client interface {
DeleteDownloadLink(linkId string) error
GetProfile() (*Profile, error)
GetAvailableSlots() (int, error)
SyncAccounts() error // Updates each accounts details(like traffic, username, etc.)
}

View File

@@ -114,19 +114,27 @@ type IngestData struct {
Size int64 `json:"size"`
}
type LibraryStats struct {
Total int `json:"total"`
Bad int `json:"bad"`
ActiveLinks int `json:"active_links"`
}
type Stats struct {
Profile *Profile `json:"profile"`
Library LibraryStats `json:"library"`
Accounts []map[string]any `json:"accounts"`
}
type Profile struct {
Name string `json:"name"`
Id int64 `json:"id"`
Username string `json:"username"`
Email string `json:"email"`
Points int64 `json:"points"`
Points int `json:"points"`
Type string `json:"type"`
Premium int `json:"premium"`
Premium int64 `json:"premium"`
Expiration time.Time `json:"expiration"`
LibrarySize int `json:"library_size"`
BadTorrents int `json:"bad_torrents"`
ActiveLinks int `json:"active_links"`
}
type DownloadLink struct {

View File

@@ -70,7 +70,10 @@ func (m *Manager) performMount(provider, webdavURL string) error {
// Clean up any stale mount first
if exists && !existingMount.Mounted {
m.forceUnmountPath(mountPath)
err := m.forceUnmountPath(mountPath)
if err != nil {
return err
}
}
// Create rclone config for this provider
@@ -82,14 +85,13 @@ func (m *Manager) performMount(provider, webdavURL string) error {
mountArgs := map[string]interface{}{
"fs": fmt.Sprintf("%s:", provider),
"mountPoint": mountPath,
"mountType": "mount", // Use standard FUSE mount
"mountOpt": map[string]interface{}{
"AllowNonEmpty": true,
"AllowOther": true,
"DebugFUSE": false,
"DeviceName": fmt.Sprintf("decypharr-%s", provider),
"VolumeName": fmt.Sprintf("decypharr-%s", provider),
},
}
mountOpt := map[string]interface{}{
"AllowNonEmpty": true,
"AllowOther": true,
"DebugFUSE": false,
"DeviceName": fmt.Sprintf("decypharr-%s", provider),
"VolumeName": fmt.Sprintf("decypharr-%s", provider),
}
configOpts := make(map[string]interface{})
@@ -102,12 +104,13 @@ func (m *Manager) performMount(provider, webdavURL string) error {
// Only add _config if there are options to set
mountArgs["_config"] = configOpts
}
vfsOpt := map[string]interface{}{
"CacheMode": cfg.Rclone.VfsCacheMode,
}
vfsOpt["PollInterval"] = 0 // Poll interval not supported for webdav, set to 0
// Add VFS options if caching is enabled
if cfg.Rclone.VfsCacheMode != "off" {
vfsOpt := map[string]interface{}{
"CacheMode": cfg.Rclone.VfsCacheMode,
}
if cfg.Rclone.VfsCacheMaxAge != "" {
vfsOpt["CacheMaxAge"] = cfg.Rclone.VfsCacheMaxAge
@@ -130,22 +133,23 @@ func (m *Manager) performMount(provider, webdavURL string) error {
if cfg.Rclone.NoModTime {
vfsOpt["NoModTime"] = cfg.Rclone.NoModTime
}
mountArgs["vfsOpt"] = vfsOpt
}
// Add mount options based on configuration
if cfg.Rclone.UID != 0 {
mountArgs["mountOpt"].(map[string]interface{})["UID"] = cfg.Rclone.UID
mountOpt["UID"] = cfg.Rclone.UID
}
if cfg.Rclone.GID != 0 {
mountArgs["mountOpt"].(map[string]interface{})["GID"] = cfg.Rclone.GID
mountOpt["GID"] = cfg.Rclone.GID
}
if cfg.Rclone.AttrTimeout != "" {
if attrTimeout, err := time.ParseDuration(cfg.Rclone.AttrTimeout); err == nil {
mountArgs["mountOpt"].(map[string]interface{})["AttrTimeout"] = attrTimeout.String()
mountOpt["AttrTimeout"] = attrTimeout.String()
}
}
mountArgs["vfsOpt"] = vfsOpt
mountArgs["mountOpt"] = mountOpt
// Make the mount request
req := RCRequest{
Command: "mount/mount",

View File

@@ -45,7 +45,7 @@ func (m *Manager) checkMountHealth(provider string) bool {
Command: "operations/list",
Args: map[string]interface{}{
"fs": fmt.Sprintf("%s:", provider),
"remote": "/",
"remote": "",
},
}
@@ -71,7 +71,7 @@ func (m *Manager) RecoverMount(provider string) error {
}
// Wait a moment
time.Sleep(2 * time.Second)
time.Sleep(1 * time.Second)
// Try to remount
if err := m.Mount(provider, mountInfo.WebDAVURL); err != nil {

View File

@@ -10,6 +10,7 @@ import (
"os"
"os/exec"
"path/filepath"
"slices"
"sync"
"time"
@@ -24,7 +25,7 @@ type Manager struct {
rcPort string
rcUser string
rcPass string
configDir string
rcloneDir string
mounts map[string]*MountInfo
mountsMutex sync.RWMutex
logger zerolog.Logger
@@ -61,10 +62,10 @@ func NewManager() *Manager {
cfg := config.Get()
rcPort := "5572"
configDir := filepath.Join(cfg.Path, "rclone")
rcloneDir := filepath.Join(cfg.Path, "rclone")
// Ensure config directory exists
if err := os.MkdirAll(configDir, 0755); err != nil {
if err := os.MkdirAll(rcloneDir, 0755); err != nil {
_logger := logger.New("rclone")
_logger.Error().Err(err).Msg("Failed to create rclone config directory")
}
@@ -73,7 +74,7 @@ func NewManager() *Manager {
return &Manager{
rcPort: rcPort,
configDir: configDir,
rcloneDir: rcloneDir,
mounts: make(map[string]*MountInfo),
logger: logger.New("rclone"),
ctx: ctx,
@@ -98,20 +99,37 @@ func (m *Manager) Start(ctx context.Context) error {
return nil
}
logFile := filepath.Join(logger.GetLogPath(), "rclone.log")
// Delete old log file if it exists
if _, err := os.Stat(logFile); err == nil {
if err := os.Remove(logFile); err != nil {
return fmt.Errorf("failed to remove old rclone log file: %w", err)
}
}
args := []string{
"rcd",
"--rc-addr", ":" + m.rcPort,
"--rc-no-auth", // We'll handle auth at the application level
"--config", filepath.Join(m.configDir, "rclone.conf"),
"--log-level", "INFO",
"--config", filepath.Join(m.rcloneDir, "rclone.conf"),
"--log-file", logFile,
}
logLevel := cfg.Rclone.LogLevel
if logLevel != "" {
if !slices.Contains([]string{"DEBUG", "INFO", "NOTICE", "ERROR"}, logLevel) {
logLevel = "INFO"
}
args = append(args, "--log-level", logLevel)
}
if cfg.Rclone.CacheDir != "" {
if err := os.MkdirAll(cfg.Rclone.CacheDir, 0755); err == nil {
args = append(args, "--cache-dir", cfg.Rclone.CacheDir)
}
}
m.cmd = exec.CommandContext(ctx, "rclone", args...)
m.cmd.Dir = m.configDir
// Capture output for debugging
var stdout, stderr bytes.Buffer
@@ -119,9 +137,10 @@ func (m *Manager) Start(ctx context.Context) error {
m.cmd.Stderr = &stderr
if err := m.cmd.Start(); err != nil {
m.logger.Error().Str("stderr", stderr.String()).Str("stdout", stdout.String()).
Err(err).Msg("Failed to start rclone RC server")
return fmt.Errorf("failed to start rclone RC server: %w", err)
}
m.serverStarted = true
// Wait for server to be ready in a goroutine
@@ -160,9 +179,12 @@ func (m *Manager) Start(ctx context.Context) error {
default:
if code, ok := ExitCode(err); ok {
m.logger.Debug().Int("exit_code", code).Err(err).
Str("stderr", stderr.String()).
Str("stdout", stdout.String()).
Msg("Rclone RC server error")
} else {
m.logger.Debug().Err(err).Msg("Rclone RC server error (no exit code)")
m.logger.Debug().Err(err).Str("stderr", stderr.String()).
Str("stdout", stdout.String()).Msg("Rclone RC server error (no exit code)")
}
}
}()

View File

@@ -7,6 +7,7 @@ import (
"github.com/sirrobot01/decypharr/internal/config"
"net/url"
"path/filepath"
"strings"
)
// Mount represents a mount using the rclone RC client
@@ -19,15 +20,24 @@ type Mount struct {
}
// NewMount creates a new RC-based mount
func NewMount(provider, webdavURL string, rcManager *Manager) *Mount {
func NewMount(provider, customRcloneMount, webdavURL string, rcManager *Manager) *Mount {
cfg := config.Get()
mountPath := filepath.Join(cfg.Rclone.MountPath, provider)
var mountPath string
if customRcloneMount != "" {
mountPath = customRcloneMount
} else {
mountPath = filepath.Join(cfg.Rclone.MountPath, provider)
}
_url, err := url.JoinPath(webdavURL, provider)
if err != nil {
_url = fmt.Sprintf("%s/%s", webdavURL, provider)
}
if !strings.HasSuffix(_url, "/") {
_url += "/"
}
return &Mount{
Provider: provider,
LocalPath: mountPath,

View File

@@ -76,7 +76,7 @@ func (m *Manager) GetStats() (*Stats, error) {
}
// Get bandwidth stats
bwStats, err := m.GetBandwidthStats()
if err == nil {
if err == nil && bwStats != nil {
stats.Bandwidth = *bwStats
} else {
fmt.Println("Failed to get rclone stats", err)

View File

@@ -99,25 +99,58 @@ func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
}
clients := debrids.Clients()
caches := debrids.Caches()
profiles := make([]*debridTypes.Profile, 0)
debridStats := make([]debridTypes.Stats, 0)
for debridName, client := range clients {
debridStat := debridTypes.Stats{}
libraryStat := debridTypes.LibraryStats{}
profile, err := client.GetProfile()
profile.Name = debridName
if err != nil {
s.logger.Error().Err(err).Msg("Failed to get debrid profile")
continue
s.logger.Error().Err(err).Str("debrid", debridName).Msg("Failed to get debrid profile")
profile = &debridTypes.Profile{
Name: debridName,
}
}
profile.Name = debridName
debridStat.Profile = profile
cache, ok := caches[debridName]
if ok {
// Get torrent data
profile.LibrarySize = cache.TotalTorrents()
profile.BadTorrents = len(cache.GetListing("__bad__"))
profile.ActiveLinks = cache.GetTotalActiveDownloadLinks()
libraryStat.Total = cache.TotalTorrents()
libraryStat.Bad = len(cache.GetListing("__bad__"))
libraryStat.ActiveLinks = cache.GetTotalActiveDownloadLinks()
}
profiles = append(profiles, profile)
debridStat.Library = libraryStat
// Get detailed account information
accounts := client.Accounts().All()
accountDetails := make([]map[string]any, 0)
for _, account := range accounts {
// Mask token - show first 8 characters and last 4 characters
maskedToken := ""
if len(account.Token) > 12 {
maskedToken = account.Token[:8] + "****" + account.Token[len(account.Token)-4:]
} else if len(account.Token) > 8 {
maskedToken = account.Token[:4] + "****" + account.Token[len(account.Token)-2:]
} else {
maskedToken = "****"
}
accountDetail := map[string]any{
"order": account.Order,
"disabled": account.Disabled,
"token_masked": maskedToken,
"username": account.Username,
"traffic_used": account.TrafficUsed,
"links_count": account.LinksCount(),
"debrid": account.Debrid,
}
accountDetails = append(accountDetails, accountDetail)
}
debridStat.Accounts = accountDetails
debridStats = append(debridStats, debridStat)
}
stats["debrids"] = profiles
stats["debrids"] = debridStats
// Add rclone stats if available
if rcManager := store.Get().RcloneManager(); rcManager != nil && rcManager.IsReady() {

View File

@@ -12,6 +12,7 @@ import (
"io"
"net/http"
"os"
"path/filepath"
)
type Server struct {
@@ -36,11 +37,12 @@ func New(handlers map[string]http.Handler) *Server {
}
//logs
r.Get("/logs", s.getLogs)
r.Get("/logs", s.getLogs) // deprecated, use /debug/logs
//debugs
r.Route("/debug", func(r chi.Router) {
r.Get("/stats", s.handleStats)
r.Get("/logs", s.getLogs)
r.Get("/logs/rclone", s.getRcloneLogs)
r.Get("/ingests", s.handleIngests)
r.Get("/ingests/{debrid}", s.handleIngestsByDebrid)
})
@@ -75,7 +77,7 @@ func (s *Server) Start(ctx context.Context) error {
}
func (s *Server) getLogs(w http.ResponseWriter, r *http.Request) {
logFile := logger.GetLogPath()
logFile := filepath.Join(logger.GetLogPath(), "decypharr.log")
// Open and read the file
file, err := os.Open(logFile)
@@ -98,5 +100,42 @@ func (s *Server) getLogs(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Expires", "0")
// Stream the file
_, _ = io.Copy(w, file)
if _, err := io.Copy(w, file); err != nil {
s.logger.Error().Err(err).Msg("Error streaming log file")
http.Error(w, "Error streaming log file", http.StatusInternalServerError)
return
}
}
func (s *Server) getRcloneLogs(w http.ResponseWriter, r *http.Request) {
// Rclone logs resides in the same directory as the application logs
logFile := filepath.Join(logger.GetLogPath(), "rclone.log")
// Open and read the file
file, err := os.Open(logFile)
if err != nil {
http.Error(w, "Error reading log file", http.StatusInternalServerError)
return
}
defer func(file *os.File) {
err := file.Close()
if err != nil {
s.logger.Error().Err(err).Msg("Error closing log file")
return
}
}(file)
// Set headers
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("Content-Disposition", "inline; filename=application.log")
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
w.Header().Set("Pragma", "no-cache")
w.Header().Set("Expires", "0")
// Stream the file
if _, err := io.Copy(w, file); err != nil {
s.logger.Error().Err(err).Msg("Error streaming log file")
http.Error(w, "Error streaming log file", http.StatusInternalServerError)
return
}
}

View File

@@ -3,6 +3,8 @@ package store
import (
"context"
"fmt"
"github.com/go-co-op/gocron/v2"
"github.com/sirrobot01/decypharr/internal/utils"
"time"
)
@@ -25,58 +27,51 @@ func (s *Store) addToQueue(importReq *ImportRequest) error {
return nil
}
func (s *Store) StartQueueSchedule(ctx context.Context) error {
// Start the slots processing in a separate goroutine
go func() {
if err := s.processSlotsQueue(ctx); err != nil {
s.logger.Error().Err(err).Msg("Error processing slots queue")
}
}()
func (s *Store) StartQueueWorkers(ctx context.Context) error {
// This function is responsible for starting the scheduled tasks
// Start the remove stalled torrents processing in a separate goroutine
go func() {
if err := s.processRemoveStalledTorrents(ctx); err != nil {
s.logger.Error().Err(err).Msg("Error processing remove stalled torrents")
}
}()
if ctx == nil {
ctx = context.Background()
}
return nil
}
s.scheduler.RemoveByTags("decypharr-store")
func (s *Store) processSlotsQueue(ctx context.Context) error {
s.trackAvailableSlots(ctx) // Initial tracking of available slots
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
if jd, err := utils.ConvertToJobDef("30s"); err != nil {
s.logger.Error().Err(err).Msg("Failed to convert slots tracking interval to job definition")
} else {
// Schedule the job
if _, err := s.scheduler.NewJob(jd, gocron.NewTask(func() {
s.trackAvailableSlots(ctx)
}), gocron.WithContext(ctx)); err != nil {
s.logger.Error().Err(err).Msg("Failed to create slots tracking job")
} else {
s.logger.Trace().Msgf("Download link refresh job scheduled for every %s", "30s")
}
}
}
func (s *Store) processRemoveStalledTorrents(ctx context.Context) error {
if s.removeStalledAfter <= 0 {
return nil // No need to remove stalled torrents if the duration is not set
}
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
if err := s.removeStalledTorrents(ctx); err != nil {
s.logger.Error().Err(err).Msg("Error removing stalled torrents")
if s.removeStalledAfter > 0 {
// Stalled torrents removal job
if jd, err := utils.ConvertToJobDef("1m"); err != nil {
s.logger.Error().Err(err).Msg("Failed to convert remove stalled torrents interval to job definition")
} else {
// Schedule the job
if _, err := s.scheduler.NewJob(jd, gocron.NewTask(func() {
err := s.removeStalledTorrents(ctx)
if err != nil {
s.logger.Error().Err(err).Msg("Failed to process remove stalled torrents")
}
}), gocron.WithContext(ctx)); err != nil {
s.logger.Error().Err(err).Msg("Failed to create remove stalled torrents job")
} else {
s.logger.Trace().Msgf("Remove stalled torrents job scheduled for every %s", "1m")
}
}
}
// Start the scheduler
s.scheduler.Start()
s.logger.Debug().Msg("Store worker started")
return nil
}
func (s *Store) trackAvailableSlots(ctx context.Context) {

View File

@@ -3,6 +3,7 @@ package store
import (
"cmp"
"context"
"github.com/go-co-op/gocron/v2"
"github.com/rs/zerolog"
"github.com/sirrobot01/decypharr/internal/config"
"github.com/sirrobot01/decypharr/internal/logger"
@@ -26,6 +27,7 @@ type Store struct {
skipPreCache bool
downloadSemaphore chan struct{}
removeStalledAfter time.Duration // Duration after which stalled torrents are removed
scheduler gocron.Scheduler
}
var (
@@ -49,6 +51,11 @@ func Get() *Store {
arrs := arr.NewStorage()
deb := debrid.NewStorage(rcManager)
scheduler, err := gocron.NewScheduler(gocron.WithLocation(time.Local), gocron.WithGlobalJobOptions(gocron.WithTags("decypharr-store")))
if err != nil {
scheduler, _ = gocron.NewScheduler(gocron.WithGlobalJobOptions(gocron.WithTags("decypharr-store")))
}
instance = &Store{
repair: repair.New(arrs, deb),
arr: arrs,
@@ -56,10 +63,11 @@ func Get() *Store {
rcloneManager: rcManager,
torrents: newTorrentStorage(cfg.TorrentsFile()),
logger: logger.Default(), // Use default logger [decypharr]
refreshInterval: time.Duration(cmp.Or(qbitCfg.RefreshInterval, 10)) * time.Minute,
refreshInterval: time.Duration(cmp.Or(qbitCfg.RefreshInterval, 30)) * time.Second,
skipPreCache: qbitCfg.SkipPreCache,
downloadSemaphore: make(chan struct{}, cmp.Or(qbitCfg.MaxDownloads, 5)),
importsQueue: NewImportQueue(context.Background(), 1000),
scheduler: scheduler,
}
if cfg.RemoveStalledAfter != "" {
removeStalledAfter, err := time.ParseDuration(cfg.RemoveStalledAfter)
@@ -88,6 +96,11 @@ func Reset() {
// Close the semaphore channel to
close(instance.downloadSemaphore)
}
if instance.scheduler != nil {
_ = instance.scheduler.StopJobs()
_ = instance.scheduler.Shutdown()
}
}
once = sync.Once{}
instance = nil
@@ -108,3 +121,7 @@ func (s *Store) Torrents() *TorrentStorage {
func (s *Store) RcloneManager() *rclone.Manager {
return s.rcloneManager
}
func (s *Store) Scheduler() gocron.Scheduler {
return s.scheduler
}

44
pkg/store/worker.go Normal file
View File

@@ -0,0 +1,44 @@
package store
import "context"
func (s *Store) StartWorkers(ctx context.Context) {
if ctx == nil {
ctx = context.Background()
}
// Start debrid workers
if err := s.Debrid().StartWorker(ctx); err != nil {
s.logger.Error().Err(err).Msg("Failed to start debrid worker")
} else {
s.logger.Debug().Msg("Started debrid worker")
}
// Cache workers
for _, cache := range s.Debrid().Caches() {
if cache == nil {
continue
}
go func() {
if err := cache.StartWorker(ctx); err != nil {
s.logger.Error().Err(err).Msg("Failed to start debrid cache worker")
} else {
s.logger.Debug().Msgf("Started debrid cache worker for %s", cache.GetConfig().Name)
}
}()
}
// Store queue workers
if err := s.StartQueueWorkers(ctx); err != nil {
s.logger.Error().Err(err).Msg("Failed to start store worker")
} else {
s.logger.Debug().Msg("Started store worker")
}
// Arr workers
if err := s.Arr().StartWorker(ctx); err != nil {
s.logger.Error().Err(err).Msg("Failed to start Arr worker")
} else {
s.logger.Debug().Msg("Started Arr worker")
}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -152,7 +152,7 @@ class ConfigManager {
'enabled', 'mount_path', 'cache_dir', 'vfs_cache_mode', 'vfs_cache_max_size', 'vfs_cache_max_age',
'vfs_cache_poll_interval', 'vfs_read_chunk_size', 'vfs_read_chunk_size_limit', 'buffer_size',
'uid', 'gid', 'vfs_read_ahead', 'attr_timeout', 'dir_cache_time', 'poll_interval', 'umask',
'no_modtime', 'no_checksum'
'no_modtime', 'no_checksum', 'log_level'
];
fields.forEach(field => {
@@ -246,7 +246,7 @@ class ConfigManager {
<select class="select select-bordered" name="debrid[${index}].name" id="debrid[${index}].name" required>
<option value="realdebrid">Real Debrid</option>
<option value="alldebrid">AllDebrid</option>
<option value="debrid_link">Debrid Link</option>
<option value="debridlink">Debrid Link</option>
<option value="torbox">Torbox</option>
</select>
</div>
@@ -313,6 +313,19 @@ class ConfigManager {
<span class="label-text-alt">API rate limit for this service</span>
</div>
</div>
<div class="form-control">
<label class="label" for="debrid[${index}].rclone_mount_path">
<span class="label-text font-medium">Custom Rclone Mount Path</span>
<span class="badge badge-ghost badge-sm">Optional</span>
</label>
<input type="text" class="input input-bordered"
name="debrid[${index}].rclone_mount_path" id="debrid[${index}].rclone_mount_path"
placeholder="/custom/mount/path (leave empty for global mount path)">
<div class="label">
<span class="label-text-alt">Custom mount path for this debrid service. If empty, uses global rclone mount path.</span>
</div>
</div>
<div class="form-control">
<label class="label" for="debrid[${index}].proxy">
<span class="label-text font-medium">Proxy</span>
@@ -531,11 +544,6 @@ class ConfigManager {
if (toggle.checked || forceShow) {
webdavSection.classList.remove('hidden');
// Add required attributes to key fields
webdavSection.querySelectorAll('input[name$=".torrents_refresh_interval"]').forEach(el => el.required = true);
webdavSection.querySelectorAll('input[name$=".download_links_refresh_interval"]').forEach(el => el.required = true);
webdavSection.querySelectorAll('input[name$=".auto_expire_links_after"]').forEach(el => el.required = true);
webdavSection.querySelectorAll('input[name$=".workers"]').forEach(el => el.required = true);
} else {
webdavSection.classList.add('hidden');
// Remove required attributes
@@ -923,7 +931,7 @@ class ConfigManager {
<option value="" selected>Auto-select</option>
<option value="realdebrid">Real Debrid</option>
<option value="alldebrid">AllDebrid</option>
<option value="debrid_link">Debrid Link</option>
<option value="debridlink">Debrid Link</option>
<option value="torbox">Torbox</option>
</select>
<div class="label">
@@ -1094,6 +1102,7 @@ class ConfigManager {
api_key: document.querySelector(`[name="debrid[${i}].api_key"]`).value,
folder: document.querySelector(`[name="debrid[${i}].folder"]`).value,
rate_limit: document.querySelector(`[name="debrid[${i}].rate_limit"]`).value,
rclone_mount_path: document.querySelector(`[name="debrid[${i}].rclone_mount_path"]`).value,
proxy: document.querySelector(`[name="debrid[${i}].proxy"]`).value,
download_uncached: document.querySelector(`[name="debrid[${i}].download_uncached"]`).checked,
unpack_rar: document.querySelector(`[name="debrid[${i}].unpack_rar"]`).checked,
@@ -1239,6 +1248,7 @@ class ConfigManager {
dir_cache_time: getElementValue('dir_cache_time', '5m'),
no_modtime: getElementValue('no_modtime', false),
no_checksum: getElementValue('no_checksum', false),
log_level: getElementValue('log_level', 'INFO'),
};
}

View File

@@ -466,6 +466,18 @@
</div>
</div>
<div class="form-control">
<label class="label" for="rclone.log_level">
<span class="label-text font-medium">Log Level</span>
</label>
<select class="select select-bordered" name="rclone.log_level" id="rclone.log_level">
<option value="INFO">INFO</option>
<option value="DEBUG">DEBUG</option>
<option value="NOTICE">NOTICE</option>
<option value="ERROR">ERROR</option>
</select>
</div>
<div class="form-control">
<label class="label" for="rclone.uid">
<span class="label-text font-medium">User ID (PUID)</span>

View File

@@ -93,7 +93,7 @@
<i class="bi bi-cloud"></i>
<span class="hidden xl:inline">WebDAV</span>
</a></li>
<li><a href="{{.URLBase}}logs" target="_blank" class="tooltip tooltip-bottom" data-tip="System Logs">
<li><a href="{{.URLBase}}debug/logs" target="_blank" class="tooltip tooltip-bottom" data-tip="System Logs">
<i class="bi bi-journal-text"></i>
<span class="hidden xl:inline">Logs</span>
</a></li>

View File

@@ -72,10 +72,16 @@
<div class="card bg-base-100 shadow-xl" id="rclone-card">
<div class="card-header p-6 pb-3">
<h2 class="card-title text-xl">
<i class="bi bi-cloud-arrow-up text-primary"></i>
Rclone Statistics
</h2>
<div class="card-title text-xl justify-between items-center">
<h2>
<i class="bi bi-cloud-arrow-up text-primary"></i>
Rclone Statistics
</h2>
<a href="{{.URLBase}}debug/logs/rclone" class="btn btn-sm btn-outline" target="_blank">
<i class="bi bi-arrow-right"></i>
View Rclone Logs
</a>
</div>
<div class="badge" id="rclone-status">Unknown</div>
</div>
<div class="card-body p-6 pt-3" id="rclone-content">
@@ -179,9 +185,9 @@
<div class="stat-desc">Total: ${cs.totalChecks || 0}</div>
</div>
<div class="stat">
<div class="stat-title">Elapsed Time</div>
<div class="stat-title">Uptime</div>
<div class="stat-value text-accent">${window.decypharrUtils.formatDuration(cs.elapsedTime)}</div>
<div class="stat-desc">Transfer: ${window.decypharrUtils.formatDuration(cs.transferTime)}m</div>
<div class="stat-desc">Transfer: ${window.decypharrUtils.formatDuration(cs.transferTime)}</div>
</div>
`;
}
@@ -312,37 +318,96 @@
let html = '<div class="space-y-4">';
debrids.forEach(debrid => {
const profile = debrid.profile || {};
const library = debrid.library || {};
const accounts = debrid.accounts || [];
html += `
<div class="card bg-base-200">
<div class="card-body p-4">
<div class="flex justify-between items-start">
<div>
<h3 class="card-title text-lg">${debrid.name || 'Unknown Service'}</h3>
<p class="text-sm text-base-content/70">${debrid.username || 'No username'}</p>
<h3 class="card-title text-lg">${profile.name || 'Unknown Service'}</h3>
<p class="text-sm text-base-content/70">${profile.username || 'No username'}</p>
</div>
<div class="text-right">
<div class="text-sm font-mono">${debrid.points} points</div>
<div class="text-xs text-base-content/70">Expires: ${debrid.expiration || 'Unknown'}</div>
<div class="text-sm font-mono">${formatNumber(profile.points || 0)} points</div>
<div class="text-xs text-base-content/70">Type: ${profile.type || 'Unknown'}</div>
<div class="text-xs text-base-content/70">Expires: ${profile.expiration ? new Date(profile.expiration).toLocaleDateString() : 'Unknown'}</div>
</div>
</div>
<div class="grid grid-cols-2 md:grid-cols-4 gap-3 mt-4">
<div class="stat">
<div class="stat-title text-xs">Library Size</div>
<div class="stat-value text-sm">${formatNumber(debrid.library_size || 0)}</div>
<div class="stat-value text-sm">${formatNumber(library.total || 0)}</div>
</div>
<div class="stat">
<div class="stat-title text-xs">Bad Torrents</div>
<div class="stat-value text-sm text-error">${formatNumber(debrid.bad_torrents || 0)}</div>
<div class="stat-value text-sm text-error">${formatNumber(library.bad || 0)}</div>
</div>
<div class="stat">
<div class="stat-title text-xs">Active Links</div>
<div class="stat-value text-sm text-success">${formatNumber(debrid.active_links || 0)}</div>
<div class="stat-value text-sm text-success">${formatNumber(library.active_links || 0)}</div>
</div>
<div class="stat">
<div class="stat-title text-xs">Type</div>
<div class="stat-value text-sm">${debrid.type || 'Unknown'}</div>
<div class="stat-title text-xs">Total Accounts</div>
<div class="stat-value text-sm text-info">${accounts.length}</div>
</div>
</div>
`;
// Add accounts section if there are accounts
if (accounts && accounts.length > 0) {
html += `
<div class="mt-6">
<h4 class="text-lg font-semibold mb-3">
<i class="bi bi-person-lines-fill text-primary"></i>
Accounts
</h4>
<div class="grid grid-cols-2 md:grid-cols-2 gap-2">
`;
accounts.forEach((account, index) => {
const statusBadge = account.disabled ?
'<span class="badge badge-error badge-sm">Disabled</span>' :
'<span class="badge badge-success badge-sm">Active</span>';
html += `
<div class="card bg-base-100 compact">
<div class="card-body p-3">
<div class="flex justify-between items-start mb-2">
<div class="flex-1">
<div class="flex items-center gap-2">
<h5 class="font-medium text-sm">Account #${account.order + 1}</h5>
${statusBadge}
</div>
<p class="text-xs text-base-content/70 mt-1">${account.username || 'No username'}</p>
</div>
<div class="text-right">
<div class="text-xs font-mono text-base-content/80">
Token: ${account.token_masked || '****'}
</div>
</div>
</div>
<div class="grid grid-cols-2 md:grid-cols-2 gap-2">
<div class="stat bg-base-200 rounded p-2">
<div class="stat-title text-xs">Traffic Used</div>
<div class="stat-value text-xs">${window.decypharrUtils.formatBytes(account.traffic_used || 0)}</div>
</div>
<div class="stat bg-base-200 rounded p-2">
<div class="stat-title text-xs">Links Count</div>
<div class="stat-value text-xs">${formatNumber(account.links_count || 0)}</div>
</div>
</div>
</div>
</div>
`;
});
html += '</div></div>';
}
html += `
</div>
</div>
`;