Files
decypharr/pkg/webdav/file.go
2025-08-05 05:01:34 +01:00

499 lines
13 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package webdav
import (
"crypto/tls"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"
"github.com/sirrobot01/decypharr/pkg/debrid/store"
)
var streamingTransport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
MaxIdleConns: 200,
MaxIdleConnsPerHost: 100,
MaxConnsPerHost: 200,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 60 * time.Second, // give the upstream a minute to send headers
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: true, // close after each request
ForceAttemptHTTP2: false, // dont speak HTTP/2
// this line is what truly blocks HTTP/2:
TLSNextProto: make(map[string]func(string, *tls.Conn) http.RoundTripper),
}
var sharedClient = &http.Client{
Transport: streamingTransport,
Timeout: 0,
}
type streamError struct {
Err error
StatusCode int
IsClientDisconnection bool
}
func (e *streamError) Error() string {
return e.Err.Error()
}
func (e *streamError) Unwrap() error {
return e.Err
}
type File struct {
name string
torrentName string
link string
downloadLink string
size int64
isDir bool
fileId string
isRar bool
metadataOnly bool
content []byte
children []os.FileInfo // For directories
cache *store.Cache
modTime time.Time
// Minimal state for interface compliance only
readOffset int64 // Only used for Read() method compliance
}
// File interface implementations for File
func (f *File) Close() error {
if f.isDir {
return nil // No resources to close for directories
}
// For files, we don't have any resources to close either
// This is just to satisfy the os.File interface
f.content = nil
f.children = nil
f.downloadLink = ""
f.readOffset = 0
return nil
}
func (f *File) getDownloadLink() (string, error) {
// Check if we already have a final URL cached
if f.downloadLink != "" && isValidURL(f.downloadLink) {
return f.downloadLink, nil
}
downloadLink, err := f.cache.GetDownloadLink(f.torrentName, f.name, f.link)
if err != nil {
return "", err
}
if downloadLink != "" && isValidURL(downloadLink) {
f.downloadLink = downloadLink
return downloadLink, nil
}
return "", os.ErrNotExist
}
func (f *File) getDownloadByteRange() (*[2]int64, error) {
byteRange, err := f.cache.GetDownloadByteRange(f.torrentName, f.name)
if err != nil {
return nil, err
}
return byteRange, nil
}
// setVideoStreamingHeaders sets the necessary headers for video streaming
// It returns error and a boolean indicating if the request is a range request
func (f *File) servePreloadedContent(w http.ResponseWriter, r *http.Request) error {
content := f.content
size := int64(len(content))
// Handle range requests for preloaded content
if rangeHeader := r.Header.Get("Range"); rangeHeader != "" {
ranges, err := parseRange(rangeHeader, size)
if err != nil || len(ranges) != 1 {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", size))
return &streamError{Err: fmt.Errorf("invalid range"), StatusCode: http.StatusRequestedRangeNotSatisfiable}
}
start, end := ranges[0].start, ranges[0].end
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, size))
w.Header().Set("Content-Length", fmt.Sprintf("%d", end-start+1))
w.Header().Set("Accept-Ranges", "bytes")
w.WriteHeader(http.StatusPartialContent)
_, err = w.Write(content[start : end+1])
return err
}
// Full content
w.Header().Set("Content-Length", fmt.Sprintf("%d", size))
w.Header().Set("Accept-Ranges", "bytes")
w.WriteHeader(http.StatusOK)
_, err := w.Write(content)
return err
}
func (f *File) StreamResponse(w http.ResponseWriter, r *http.Request) error {
// Handle preloaded content files
if f.content != nil {
return f.servePreloadedContent(w, r)
}
// Try streaming with retry logic
return f.streamWithRetry(w, r, 0)
}
func (f *File) streamWithRetry(w http.ResponseWriter, r *http.Request, retryCount int) error {
const maxRetries = 3
_log := f.cache.Logger()
// Get download link (with caching optimization)
downloadLink, err := f.getDownloadLink()
if err != nil {
return &streamError{Err: err, StatusCode: http.StatusPreconditionFailed}
}
if downloadLink == "" {
return &streamError{Err: fmt.Errorf("empty download link"), StatusCode: http.StatusNotFound}
}
// Create upstream request with streaming optimizations
upstreamReq, err := http.NewRequest("GET", downloadLink, nil)
if err != nil {
return &streamError{Err: err, StatusCode: http.StatusInternalServerError}
}
setVideoStreamingHeaders(upstreamReq)
// Handle range requests (critical for video seeking)
isRangeRequest := f.handleRangeRequest(upstreamReq, r, w)
if isRangeRequest == -1 {
return &streamError{Err: fmt.Errorf("invalid range"), StatusCode: http.StatusRequestedRangeNotSatisfiable}
}
resp, err := sharedClient.Do(upstreamReq)
if err != nil {
return &streamError{Err: err, StatusCode: http.StatusServiceUnavailable}
}
defer resp.Body.Close()
// Handle upstream errors with retry logic
shouldRetry, retryErr := f.handleUpstream(resp, retryCount, maxRetries)
if shouldRetry && retryCount < maxRetries {
// Retry with new download link
_log.Debug().
Int("retry_count", retryCount+1).
Str("file", f.name).
Msg("Retrying stream request")
return f.streamWithRetry(w, r, retryCount+1)
}
if retryErr != nil {
return retryErr
}
// Determine status code based on range request
statusCode := http.StatusOK
if isRangeRequest == 1 {
statusCode = http.StatusPartialContent
}
// Set headers before streaming
if contentLength := resp.Header.Get("Content-Length"); contentLength != "" {
w.Header().Set("Content-Length", contentLength)
}
if contentRange := resp.Header.Get("Content-Range"); contentRange != "" && isRangeRequest == 1 {
w.Header().Set("Content-Range", contentRange)
}
if err := f.streamBuffer(w, resp.Body, statusCode); err != nil {
return err
}
return nil
}
func (f *File) streamBuffer(w http.ResponseWriter, src io.Reader, statusCode int) error {
flusher, ok := w.(http.Flusher)
if !ok {
return fmt.Errorf("response does not support flushing")
}
smallBuf := make([]byte, 64*1024) // 64 KB
if n, err := src.Read(smallBuf); n > 0 {
// Write status code just before first successful write
w.WriteHeader(statusCode)
if _, werr := w.Write(smallBuf[:n]); werr != nil {
if isClientDisconnection(werr) {
return &streamError{Err: werr, StatusCode: 0, IsClientDisconnection: true}
}
// Headers already sent, can't send HTTP error response
return &streamError{Err: werr, StatusCode: 0, IsClientDisconnection: false}
}
flusher.Flush()
} else if err != nil && err != io.EOF {
return &streamError{Err: err, StatusCode: http.StatusInternalServerError}
}
buf := make([]byte, 256*1024) // 256 KB
for {
n, readErr := src.Read(buf)
if n > 0 {
if _, writeErr := w.Write(buf[:n]); writeErr != nil {
if isClientDisconnection(writeErr) {
return &streamError{Err: writeErr, StatusCode: 0, IsClientDisconnection: true}
}
// Headers already sent, can't send HTTP error response
return &streamError{Err: writeErr, StatusCode: 0, IsClientDisconnection: false}
}
flusher.Flush()
}
if readErr != nil {
if readErr == io.EOF {
return nil
}
if isClientDisconnection(readErr) {
return &streamError{Err: readErr, StatusCode: 0, IsClientDisconnection: true}
}
return readErr
}
}
}
func (f *File) handleUpstream(resp *http.Response, retryCount, maxRetries int) (shouldRetry bool, err error) {
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusPartialContent {
return false, nil
}
_log := f.cache.Logger()
// Clean up response body properly
cleanupResp := func(resp *http.Response) {
if resp.Body != nil {
_, _ = io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}
}
switch resp.StatusCode {
case http.StatusServiceUnavailable:
// Read the body to check for specific error messages
body, readErr := io.ReadAll(resp.Body)
cleanupResp(resp)
if readErr != nil {
_log.Error().Err(readErr).Msg("Failed to read response body")
return false, &streamError{
Err: fmt.Errorf("failed to read error response: %w", readErr),
StatusCode: http.StatusServiceUnavailable,
}
}
bodyStr := string(body)
if strings.Contains(bodyStr, "you have exceeded your traffic") {
_log.Debug().
Str("file", f.name).
Int("retry_count", retryCount).
Msg("Bandwidth exceeded. Marking link as invalid")
f.cache.MarkDownloadLinkAsInvalid(f.link, f.downloadLink, "bandwidth_exceeded")
// Retry with a different API key if available and we haven't exceeded retries
if retryCount < maxRetries {
return true, nil
}
return false, &streamError{
Err: fmt.Errorf("bandwidth exceeded after %d retries", retryCount),
StatusCode: http.StatusServiceUnavailable,
}
}
return false, &streamError{
Err: fmt.Errorf("service unavailable: %s", bodyStr),
StatusCode: http.StatusServiceUnavailable,
}
case http.StatusNotFound:
cleanupResp(resp)
_log.Debug().
Str("file", f.name).
Int("retry_count", retryCount).
Msg("Link not found (404). Marking link as invalid and regenerating")
f.cache.MarkDownloadLinkAsInvalid(f.link, f.downloadLink, "link_not_found")
// Try to regenerate download link if we haven't exceeded retries
if retryCount < maxRetries {
// Clear cached link to force regeneration
f.downloadLink = ""
return true, nil
}
return false, &streamError{
Err: fmt.Errorf("file not found after %d retries", retryCount),
StatusCode: http.StatusNotFound,
}
default:
body, _ := io.ReadAll(resp.Body)
cleanupResp(resp)
_log.Error().
Int("status_code", resp.StatusCode).
Str("file", f.name).
Str("response_body", string(body)).
Msg("Unexpected upstream error")
return false, &streamError{
Err: fmt.Errorf("upstream error %d: %s", resp.StatusCode, string(body)),
StatusCode: http.StatusBadGateway,
}
}
}
func (f *File) handleRangeRequest(upstreamReq *http.Request, r *http.Request, w http.ResponseWriter) int {
rangeHeader := r.Header.Get("Range")
if rangeHeader == "" {
// For video files, apply byte range if exists
if byteRange, _ := f.getDownloadByteRange(); byteRange != nil {
upstreamReq.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", byteRange[0], byteRange[1]))
}
return 0 // No range request
}
// Parse range request
ranges, err := parseRange(rangeHeader, f.size)
if err != nil || len(ranges) != 1 {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", f.size))
return -1 // Invalid range
}
// Apply byte range offset if exists
byteRange, _ := f.getDownloadByteRange()
start, end := ranges[0].start, ranges[0].end
if byteRange != nil {
start += byteRange[0]
end += byteRange[0]
}
upstreamReq.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
return 1 // Valid range request
}
/*
These are the methods that implement the os.File interface for the File type.
Only Stat and ReadDir are used
*/
func (f *File) Stat() (os.FileInfo, error) {
if f.isDir {
return &FileInfo{
name: f.name,
size: 0,
mode: 0755 | os.ModeDir,
modTime: f.modTime,
isDir: true,
}, nil
}
return &FileInfo{
name: f.name,
size: f.size,
mode: 0644,
modTime: f.modTime,
isDir: false,
}, nil
}
func (f *File) Read(p []byte) (n int, err error) {
if f.isDir {
return 0, os.ErrInvalid
}
if f.metadataOnly {
return 0, io.EOF
}
// For preloaded content files (like version.txt)
if f.content != nil {
if f.readOffset >= int64(len(f.content)) {
return 0, io.EOF
}
n = copy(p, f.content[f.readOffset:])
f.readOffset += int64(n)
return n, nil
}
// For streaming files, return an error to force use of StreamResponse
return 0, fmt.Errorf("use StreamResponse method for streaming files")
}
func (f *File) Seek(offset int64, whence int) (int64, error) {
if f.isDir {
return 0, os.ErrInvalid
}
// Only handle seeking for preloaded content
if f.content != nil {
newOffset := f.readOffset
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset += offset
case io.SeekEnd:
newOffset = int64(len(f.content)) + offset
default:
return 0, os.ErrInvalid
}
if newOffset < 0 {
newOffset = 0
}
if newOffset > int64(len(f.content)) {
newOffset = int64(len(f.content))
}
f.readOffset = newOffset
return f.readOffset, nil
}
// For streaming files, return error to force use of StreamResponse
return 0, fmt.Errorf("use StreamResponse method for streaming files")
}
func (f *File) Write(p []byte) (n int, err error) {
return 0, os.ErrPermission
}
func (f *File) Readdir(count int) ([]os.FileInfo, error) {
if !f.isDir {
return nil, os.ErrInvalid
}
if count <= 0 {
return f.children, nil
}
if len(f.children) == 0 {
return nil, io.EOF
}
if count > len(f.children) {
count = len(f.children)
}
files := f.children[:count]
f.children = f.children[count:]
return files, nil
}