package rclone import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "net/http" "os" "os/exec" "path/filepath" "slices" "strings" "sync" "time" "github.com/rs/zerolog" "github.com/sirrobot01/decypharr/internal/config" "github.com/sirrobot01/decypharr/internal/logger" ) // Manager handles the rclone RC server and provides mount operations type Manager struct { cmd *exec.Cmd rcPort string rcUser string rcPass string rcloneDir string mounts map[string]*MountInfo mountsMutex sync.RWMutex logger zerolog.Logger ctx context.Context cancel context.CancelFunc httpClient *http.Client serverReady chan struct{} serverStarted bool mu sync.RWMutex } type MountInfo struct { Provider string `json:"provider"` LocalPath string `json:"local_path"` WebDAVURL string `json:"webdav_url"` Mounted bool `json:"mounted"` MountedAt string `json:"mounted_at,omitempty"` ConfigName string `json:"config_name"` Error string `json:"error,omitempty"` } type RCRequest struct { Command string `json:"command"` Args map[string]interface{} `json:"args,omitempty"` } type RCResponse struct { Result interface{} `json:"result,omitempty"` Error string `json:"error,omitempty"` } // NewManager creates a new rclone RC manager func NewManager() *Manager { cfg := config.Get() rcPort := "5572" rcloneDir := filepath.Join(cfg.Path, "rclone") // Ensure config directory exists if err := os.MkdirAll(rcloneDir, 0755); err != nil { _logger := logger.New("rclone") _logger.Error().Err(err).Msg("Failed to create rclone config directory") } ctx, cancel := context.WithCancel(context.Background()) return &Manager{ rcPort: rcPort, rcloneDir: rcloneDir, mounts: make(map[string]*MountInfo), logger: logger.New("rclone"), ctx: ctx, cancel: cancel, httpClient: &http.Client{Timeout: 60 * time.Second}, serverReady: make(chan struct{}), } } // Start starts the rclone RC server func (m *Manager) Start(ctx context.Context) error { m.mu.Lock() defer m.mu.Unlock() if m.serverStarted { return nil } cfg := config.Get() if !cfg.Rclone.Enabled { m.logger.Info().Msg("Rclone is disabled, skipping RC server startup") return nil } logFile := filepath.Join(logger.GetLogPath(), "rclone.log") // Delete old log file if it exists if _, err := os.Stat(logFile); err == nil { if err := os.Remove(logFile); err != nil { return fmt.Errorf("failed to remove old rclone log file: %w", err) } } args := []string{ "rcd", "--rc-addr", ":" + m.rcPort, "--rc-no-auth", // We'll handle auth at the application level "--config", filepath.Join(m.rcloneDir, "rclone.conf"), "--log-file", logFile, } logLevel := cfg.Rclone.LogLevel if logLevel != "" { if !slices.Contains([]string{"DEBUG", "INFO", "NOTICE", "ERROR"}, logLevel) { logLevel = "INFO" } args = append(args, "--log-level", logLevel) } if cfg.Rclone.CacheDir != "" { if err := os.MkdirAll(cfg.Rclone.CacheDir, 0755); err == nil { args = append(args, "--cache-dir", cfg.Rclone.CacheDir) } } m.cmd = exec.CommandContext(ctx, "rclone", args...) // Capture output for debugging var stdout, stderr bytes.Buffer m.cmd.Stdout = &stdout m.cmd.Stderr = &stderr if err := m.cmd.Start(); err != nil { m.logger.Error().Str("stderr", stderr.String()).Str("stdout", stdout.String()). Err(err).Msg("Failed to start rclone RC server") return fmt.Errorf("failed to start rclone RC server: %w", err) } m.serverStarted = true // Wait for server to be ready in a goroutine go func() { defer func() { if r := recover(); r != nil { m.logger.Error().Interface("panic", r).Msg("Panic in rclone RC server monitor") } }() m.waitForServer() close(m.serverReady) // Start mount monitoring once server is ready go func() { defer func() { if r := recover(); r != nil { m.logger.Error().Interface("panic", r).Msg("Panic in mount monitor") } }() m.MonitorMounts(ctx) }() // Wait for command to finish and log output err := m.cmd.Wait() switch { case err == nil: m.logger.Info().Msg("Rclone RC server exited normally") case errors.Is(err, context.Canceled): m.logger.Info().Msg("Rclone RC server terminated: context canceled") case WasHardTerminated(err): // SIGKILL on *nix; non-zero exit on Windows m.logger.Info().Msg("Rclone RC server hard-terminated") default: if code, ok := ExitCode(err); ok { m.logger.Debug().Int("exit_code", code).Err(err). Str("stderr", stderr.String()). Str("stdout", stdout.String()). Msg("Rclone RC server error") } else { m.logger.Debug().Err(err).Str("stderr", stderr.String()). Str("stdout", stdout.String()).Msg("Rclone RC server error (no exit code)") } } }() return nil } // Stop stops the rclone RC server and unmounts all mounts func (m *Manager) Stop() error { m.mu.Lock() defer m.mu.Unlock() if !m.serverStarted { return nil } m.logger.Info().Msg("Stopping rclone RC server") // Unmount all mounts first m.mountsMutex.RLock() mountList := make([]*MountInfo, 0, len(m.mounts)) for _, mount := range m.mounts { if mount.Mounted { mountList = append(mountList, mount) } } m.mountsMutex.RUnlock() // Unmount in parallel var wg sync.WaitGroup for _, mount := range mountList { wg.Add(1) go func(mount *MountInfo) { defer wg.Done() if err := m.unmount(mount.Provider); err != nil { m.logger.Error().Err(err).Str("provider", mount.Provider).Msg("Failed to unmount during shutdown") } }(mount) } // Wait for unmounts with timeout done := make(chan struct{}) go func() { wg.Wait() close(done) }() select { case <-done: m.logger.Info().Msg("All mounts unmounted successfully") case <-time.After(30 * time.Second): m.logger.Warn().Msg("Timeout waiting for mounts to unmount, proceeding with shutdown") } // Cancel context and stop process m.cancel() if m.cmd != nil && m.cmd.Process != nil { // Try graceful shutdown first if err := m.cmd.Process.Signal(os.Interrupt); err != nil { m.logger.Warn().Err(err).Msg("Failed to send interrupt signal, using kill") if killErr := m.cmd.Process.Kill(); killErr != nil { m.logger.Error().Err(killErr).Msg("Failed to kill rclone process") return killErr } } // Wait for process to exit with timeout done := make(chan error, 1) go func() { done <- m.cmd.Wait() }() select { case err := <-done: if err != nil && !errors.Is(err, context.Canceled) && !WasHardTerminated(err) { m.logger.Warn().Err(err).Msg("Rclone process exited with error") } else { m.logger.Info().Msg("Rclone process exited gracefully") } case <-time.After(2 * time.Second): m.logger.Warn().Msg("Timeout waiting for rclone to exit, force killing") if err := m.cmd.Process.Kill(); err != nil { // Check if the process already finished if !strings.Contains(err.Error(), "process already finished") { m.logger.Error().Err(err).Msg("Failed to force kill rclone process") return err } m.logger.Info().Msg("Process already finished during kill attempt") } // Still wait for the Wait() to complete to clean up the process select { case <-done: m.logger.Info().Msg("Rclone process cleanup completed") case <-time.After(5 * time.Second): m.logger.Error().Msg("Process cleanup timeout") } } } m.serverStarted = false m.logger.Info().Msg("Rclone RC server stopped") return nil } // waitForServer waits for the RC server to become available func (m *Manager) waitForServer() { maxAttempts := 30 for i := 0; i < maxAttempts; i++ { if m.ctx.Err() != nil { return } if m.pingServer() { m.logger.Info().Msg("Rclone RC server is ready") return } time.Sleep(time.Second) } m.logger.Error().Msg("Rclone RC server not responding - mount operations will be disabled") } // pingServer checks if the RC server is responding func (m *Manager) pingServer() bool { req := RCRequest{Command: "core/version"} _, err := m.makeRequest(req, true) return err == nil } func (m *Manager) makeRequest(req RCRequest, close bool) (*http.Response, error) { reqBody, err := json.Marshal(req.Args) if err != nil { return nil, fmt.Errorf("failed to marshal request: %w", err) } url := fmt.Sprintf("http://localhost:%s/%s", m.rcPort, req.Command) httpReq, err := http.NewRequestWithContext(m.ctx, "POST", url, bytes.NewBuffer(reqBody)) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") resp, err := m.httpClient.Do(httpReq) if err != nil { return nil, fmt.Errorf("failed to make request: %w", err) } if resp.StatusCode != http.StatusOK { // Read the response body to get more details defer func(Body io.ReadCloser) { err := Body.Close() if err != nil { m.logger.Debug().Err(err).Msg("Failed to close response body") } }(resp.Body) var errorResp RCResponse if err := json.NewDecoder(resp.Body).Decode(&errorResp); err != nil { return nil, fmt.Errorf("request failed with status %s, but could not decode error response: %w", resp.Status, err) } if errorResp.Error != "" { return nil, fmt.Errorf("%s", errorResp.Error) } else { return nil, fmt.Errorf("request failed with status %s and no error message", resp.Status) } } if close { defer func() { if err := resp.Body.Close(); err != nil { m.logger.Debug().Err(err).Msg("Failed to close response body") } }() } return resp, nil } // IsReady returns true if the RC server is ready func (m *Manager) IsReady() bool { select { case <-m.serverReady: return true default: return false } } // WaitForReady waits for the RC server to be ready func (m *Manager) WaitForReady(timeout time.Duration) error { select { case <-m.serverReady: return nil case <-time.After(timeout): return fmt.Errorf("timeout waiting for rclone RC server to be ready") case <-m.ctx.Done(): return m.ctx.Err() } } func (m *Manager) GetLogger() zerolog.Logger { return m.logger }