- Fix issues with new setup
- Fix arr setup getting thr wrong crendentials - Add file link invalidator - Other minor bug fixes
This commit is contained in:
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/sirrobot01/decypharr/pkg/rclone"
|
||||
|
||||
"github.com/sirrobot01/decypharr/pkg/debrid/types"
|
||||
"golang.org/x/sync/singleflight"
|
||||
|
||||
"encoding/json"
|
||||
_ "time/tzdata"
|
||||
@@ -88,6 +89,7 @@ type Cache struct {
|
||||
invalidDownloadLinks *xsync.Map[string, string]
|
||||
repairRequest *xsync.Map[string, *reInsertRequest]
|
||||
failedToReinsert *xsync.Map[string, struct{}]
|
||||
failedLinksCounter *xsync.Map[string, *atomic.Int32] // link -> counter
|
||||
|
||||
// repair
|
||||
repairChan chan RepairRequest
|
||||
@@ -112,7 +114,8 @@ type Cache struct {
|
||||
config config.Debrid
|
||||
customFolders []string
|
||||
mounter *rclone.Mount
|
||||
httpClient *http.Client
|
||||
downloadSG singleflight.Group
|
||||
streamClient *http.Client
|
||||
}
|
||||
|
||||
func NewDebridCache(dc config.Debrid, client common.Client, mounter *rclone.Mount) *Cache {
|
||||
@@ -160,10 +163,13 @@ func NewDebridCache(dc config.Debrid, client common.Client, mounter *rclone.Moun
|
||||
_log := logger.New(fmt.Sprintf("%s-webdav", client.Name()))
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
ResponseHeaderTimeout: 30 * time.Second,
|
||||
MaxIdleConns: 10,
|
||||
MaxIdleConnsPerHost: 2,
|
||||
TLSHandshakeTimeout: 30 * time.Second,
|
||||
ResponseHeaderTimeout: 60 * time.Second,
|
||||
MaxIdleConns: 100,
|
||||
MaxIdleConnsPerHost: 20,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
DisableKeepAlives: false,
|
||||
ForceAttemptHTTP2: false,
|
||||
}
|
||||
httpClient := &http.Client{
|
||||
Transport: transport,
|
||||
@@ -189,10 +195,11 @@ func NewDebridCache(dc config.Debrid, client common.Client, mounter *rclone.Moun
|
||||
mounter: mounter,
|
||||
|
||||
ready: make(chan struct{}),
|
||||
httpClient: httpClient,
|
||||
invalidDownloadLinks: xsync.NewMap[string, string](),
|
||||
repairRequest: xsync.NewMap[string, *reInsertRequest](),
|
||||
failedToReinsert: xsync.NewMap[string, struct{}](),
|
||||
failedLinksCounter: xsync.NewMap[string, *atomic.Int32](),
|
||||
streamClient: httpClient,
|
||||
repairChan: make(chan RepairRequest, 100), // Initialize the repair channel, max 100 requests buffered
|
||||
}
|
||||
|
||||
@@ -924,7 +931,3 @@ func (c *Cache) Logger() zerolog.Logger {
|
||||
func (c *Cache) GetConfig() config.Debrid {
|
||||
return c.config
|
||||
}
|
||||
|
||||
func (c *Cache) Download(req *http.Request) (*http.Response, error) {
|
||||
return c.httpClient.Do(req)
|
||||
}
|
||||
|
||||
@@ -3,50 +3,50 @@ package store
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/sirrobot01/decypharr/internal/utils"
|
||||
"github.com/sirrobot01/decypharr/pkg/debrid/types"
|
||||
)
|
||||
|
||||
type downloadLinkRequest struct {
|
||||
result string
|
||||
err error
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func newDownloadLinkRequest() *downloadLinkRequest {
|
||||
return &downloadLinkRequest{
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *downloadLinkRequest) Complete(result string, err error) {
|
||||
r.result = result
|
||||
r.err = err
|
||||
close(r.done)
|
||||
}
|
||||
|
||||
func (r *downloadLinkRequest) Wait() (string, error) {
|
||||
<-r.done
|
||||
return r.result, r.err
|
||||
}
|
||||
const (
|
||||
MaxLinkFailures = 10
|
||||
)
|
||||
|
||||
func (c *Cache) GetDownloadLink(torrentName, filename, fileLink string) (types.DownloadLink, error) {
|
||||
// Check link cache
|
||||
if dl, err := c.checkDownloadLink(fileLink); err == nil && !dl.Empty() {
|
||||
return dl, nil
|
||||
// Check
|
||||
counter, ok := c.failedLinksCounter.Load(fileLink)
|
||||
if ok && counter.Load() >= MaxLinkFailures {
|
||||
return types.DownloadLink{}, fmt.Errorf("file link %s has failed %d times, not retrying", fileLink, counter.Load())
|
||||
}
|
||||
|
||||
dl, err := c.fetchDownloadLink(torrentName, filename, fileLink)
|
||||
// Use singleflight to deduplicate concurrent requests
|
||||
v, err, _ := c.downloadSG.Do(fileLink, func() (interface{}, error) {
|
||||
// Double-check cache inside singleflight (another goroutine might have filled it)
|
||||
if dl, err := c.checkDownloadLink(fileLink); err == nil && !dl.Empty() {
|
||||
return dl, nil
|
||||
}
|
||||
|
||||
// Fetch the download link
|
||||
dl, err := c.fetchDownloadLink(torrentName, filename, fileLink)
|
||||
if err != nil {
|
||||
c.downloadSG.Forget(fileLink)
|
||||
return types.DownloadLink{}, err
|
||||
}
|
||||
|
||||
if dl.Empty() {
|
||||
c.downloadSG.Forget(fileLink)
|
||||
err = fmt.Errorf("download link is empty for %s in torrent %s", filename, torrentName)
|
||||
return types.DownloadLink{}, err
|
||||
}
|
||||
|
||||
return dl, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return types.DownloadLink{}, err
|
||||
}
|
||||
|
||||
if dl.Empty() {
|
||||
err = fmt.Errorf("download link is empty for %s in torrent %s", filename, torrentName)
|
||||
return types.DownloadLink{}, err
|
||||
}
|
||||
return dl, err
|
||||
return v.(types.DownloadLink), nil
|
||||
}
|
||||
|
||||
func (c *Cache) fetchDownloadLink(torrentName, filename, fileLink string) (types.DownloadLink, error) {
|
||||
@@ -146,7 +146,13 @@ func (c *Cache) checkDownloadLink(link string) (types.DownloadLink, error) {
|
||||
return types.DownloadLink{}, fmt.Errorf("download link not found for %s", link)
|
||||
}
|
||||
|
||||
func (c *Cache) MarkDownloadLinkAsInvalid(downloadLink types.DownloadLink, reason string) {
|
||||
func (c *Cache) MarkLinkAsInvalid(downloadLink types.DownloadLink, reason string) {
|
||||
// Increment file link error counter
|
||||
counter, _ := c.failedLinksCounter.LoadOrCompute(downloadLink.Link, func() (*atomic.Int32, bool) {
|
||||
return &atomic.Int32{}, true
|
||||
})
|
||||
counter.Add(1)
|
||||
|
||||
c.invalidDownloadLinks.Store(downloadLink.DownloadLink, reason)
|
||||
// Remove the download api key from active
|
||||
if reason == "bandwidth_exceeded" {
|
||||
@@ -166,8 +172,7 @@ func (c *Cache) MarkDownloadLinkAsInvalid(downloadLink types.DownloadLink, reaso
|
||||
}
|
||||
|
||||
func (c *Cache) downloadLinkIsInvalid(downloadLink string) bool {
|
||||
if reason, ok := c.invalidDownloadLinks.Load(downloadLink); ok {
|
||||
c.logger.Debug().Msgf("Download link %s is invalid: %s", downloadLink, reason)
|
||||
if _, ok := c.invalidDownloadLinks.Load(downloadLink); ok {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"github.com/sirrobot01/decypharr/pkg/debrid/types"
|
||||
"sort"
|
||||
|
||||
"github.com/sirrobot01/decypharr/pkg/debrid/types"
|
||||
)
|
||||
|
||||
// MergeFiles merges the files from multiple torrents into a single map.
|
||||
|
||||
239
pkg/debrid/store/stream.go
Normal file
239
pkg/debrid/store/stream.go
Normal file
@@ -0,0 +1,239 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/sirrobot01/decypharr/pkg/debrid/types"
|
||||
)
|
||||
|
||||
const (
|
||||
MaxNetworkRetries = 5
|
||||
MaxLinkRetries = 10
|
||||
)
|
||||
|
||||
type StreamError struct {
|
||||
Err error
|
||||
Retryable bool
|
||||
LinkError bool // true if we should try a new link
|
||||
}
|
||||
|
||||
func (e StreamError) Error() string {
|
||||
return e.Err.Error()
|
||||
}
|
||||
|
||||
// isConnectionError checks if the error is related to connection issues
|
||||
func (c *Cache) isConnectionError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
errStr := err.Error()
|
||||
// Check for common connection errors
|
||||
if strings.Contains(errStr, "EOF") ||
|
||||
strings.Contains(errStr, "connection reset by peer") ||
|
||||
strings.Contains(errStr, "broken pipe") ||
|
||||
strings.Contains(errStr, "connection refused") {
|
||||
return true
|
||||
}
|
||||
|
||||
// Check for net.Error types
|
||||
var netErr net.Error
|
||||
return errors.As(err, &netErr)
|
||||
}
|
||||
|
||||
func (c *Cache) Stream(ctx context.Context, start, end int64, linkFunc func() (types.DownloadLink, error)) (*http.Response, error) {
|
||||
|
||||
var lastErr error
|
||||
|
||||
downloadLink, err := linkFunc()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get download link: %w", err)
|
||||
}
|
||||
|
||||
// Outer loop: Link retries
|
||||
for retry := 0; retry < MaxLinkRetries; retry++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
resp, err := c.doRequest(ctx, downloadLink.DownloadLink, start, end)
|
||||
if err != nil {
|
||||
// Network/connection error
|
||||
lastErr = err
|
||||
c.logger.Trace().
|
||||
Int("retries", retry).
|
||||
Err(err).
|
||||
Msg("Network request failed, retrying")
|
||||
|
||||
// Backoff and continue network retry
|
||||
if retry < MaxLinkRetries {
|
||||
backoff := time.Duration(retry+1) * time.Second
|
||||
jitter := time.Duration(rand.Intn(1000)) * time.Millisecond
|
||||
select {
|
||||
case <-time.After(backoff + jitter):
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
continue
|
||||
} else {
|
||||
return nil, fmt.Errorf("network request failed after retries: %w", lastErr)
|
||||
}
|
||||
}
|
||||
|
||||
// Got response - check status
|
||||
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusPartialContent {
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// Bad status code - handle error
|
||||
streamErr := c.handleHTTPError(resp, downloadLink)
|
||||
resp.Body.Close()
|
||||
|
||||
if !streamErr.Retryable {
|
||||
return nil, streamErr // Fatal error
|
||||
}
|
||||
|
||||
if streamErr.LinkError {
|
||||
c.logger.Trace().
|
||||
Int("retries", retry).
|
||||
Msg("Link error, getting fresh link")
|
||||
lastErr = streamErr
|
||||
// Try new link
|
||||
downloadLink, err = linkFunc()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get download link: %w", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Retryable HTTP error (429, 503, etc.) - retry network
|
||||
lastErr = streamErr
|
||||
c.logger.Trace().
|
||||
Err(lastErr).
|
||||
Str("downloadLink", downloadLink.DownloadLink).
|
||||
Str("link", downloadLink.Link).
|
||||
Int("retries", retry).
|
||||
Int("statusCode", resp.StatusCode).
|
||||
Msg("HTTP error, retrying")
|
||||
|
||||
if retry < MaxNetworkRetries-1 {
|
||||
backoff := time.Duration(retry+1) * time.Second
|
||||
jitter := time.Duration(rand.Intn(1000)) * time.Millisecond
|
||||
select {
|
||||
case <-time.After(backoff + jitter):
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("stream failed after %d link retries: %w", MaxLinkRetries, lastErr)
|
||||
}
|
||||
|
||||
func (c *Cache) StreamReader(ctx context.Context, start, end int64, linkFunc func() (types.DownloadLink, error)) (io.ReadCloser, error) {
|
||||
resp, err := c.Stream(ctx, start, end, linkFunc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Validate we got the expected content
|
||||
if resp.ContentLength == 0 {
|
||||
resp.Body.Close()
|
||||
return nil, fmt.Errorf("received empty response")
|
||||
}
|
||||
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
||||
func (c *Cache) doRequest(ctx context.Context, url string, start, end int64) (*http.Response, error) {
|
||||
var lastErr error
|
||||
// Retry loop specifically for connection-level failures (EOF, reset, etc.)
|
||||
for connRetry := 0; connRetry < 3; connRetry++ {
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, StreamError{Err: err, Retryable: false}
|
||||
}
|
||||
|
||||
// Set range header
|
||||
if start > 0 || end > 0 {
|
||||
rangeHeader := fmt.Sprintf("bytes=%d-", start)
|
||||
if end > 0 {
|
||||
rangeHeader = fmt.Sprintf("bytes=%d-%d", start, end)
|
||||
}
|
||||
req.Header.Set("Range", rangeHeader)
|
||||
}
|
||||
|
||||
// Set optimized headers for streaming
|
||||
req.Header.Set("Connection", "keep-alive")
|
||||
req.Header.Set("Accept-Encoding", "identity") // Disable compression for streaming
|
||||
req.Header.Set("Cache-Control", "no-cache")
|
||||
|
||||
resp, err := c.streamClient.Do(req)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
|
||||
// Check if it's a connection error that we should retry
|
||||
if c.isConnectionError(err) && connRetry < 2 {
|
||||
// Brief backoff before retrying with fresh connection
|
||||
time.Sleep(time.Duration(connRetry+1) * 100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
return nil, StreamError{Err: err, Retryable: true}
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
return nil, StreamError{Err: fmt.Errorf("connection retry exhausted: %w", lastErr), Retryable: true}
|
||||
}
|
||||
|
||||
func (c *Cache) handleHTTPError(resp *http.Response, downloadLink types.DownloadLink) StreamError {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
bodyStr := strings.ToLower(string(body))
|
||||
|
||||
switch resp.StatusCode {
|
||||
case http.StatusNotFound:
|
||||
c.MarkLinkAsInvalid(downloadLink, "link_not_found")
|
||||
return StreamError{
|
||||
Err: errors.New("download link not found"),
|
||||
Retryable: true,
|
||||
LinkError: true,
|
||||
}
|
||||
|
||||
case http.StatusServiceUnavailable:
|
||||
if strings.Contains(bodyStr, "bandwidth") || strings.Contains(bodyStr, "traffic") {
|
||||
c.MarkLinkAsInvalid(downloadLink, "bandwidth_exceeded")
|
||||
return StreamError{
|
||||
Err: errors.New("bandwidth limit exceeded"),
|
||||
Retryable: true,
|
||||
LinkError: true,
|
||||
}
|
||||
}
|
||||
fallthrough
|
||||
|
||||
case http.StatusTooManyRequests:
|
||||
return StreamError{
|
||||
Err: fmt.Errorf("HTTP %d: rate limited", resp.StatusCode),
|
||||
Retryable: true,
|
||||
LinkError: false,
|
||||
}
|
||||
|
||||
default:
|
||||
retryable := resp.StatusCode >= 500
|
||||
return StreamError{
|
||||
Err: fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)),
|
||||
Retryable: retryable,
|
||||
LinkError: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
"github.com/sirrobot01/decypharr/internal/utils"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user