499 lines
13 KiB
Go
499 lines
13 KiB
Go
package webdav
|
||
|
||
import (
|
||
"crypto/tls"
|
||
"fmt"
|
||
"io"
|
||
"net/http"
|
||
"os"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/sirrobot01/decypharr/pkg/debrid/store"
|
||
)
|
||
|
||
var streamingTransport = &http.Transport{
|
||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||
MaxIdleConns: 200,
|
||
MaxIdleConnsPerHost: 100,
|
||
MaxConnsPerHost: 200,
|
||
IdleConnTimeout: 90 * time.Second,
|
||
TLSHandshakeTimeout: 10 * time.Second,
|
||
ResponseHeaderTimeout: 60 * time.Second, // give the upstream a minute to send headers
|
||
ExpectContinueTimeout: 1 * time.Second,
|
||
DisableKeepAlives: true, // close after each request
|
||
ForceAttemptHTTP2: false, // don’t speak HTTP/2
|
||
// this line is what truly blocks HTTP/2:
|
||
TLSNextProto: make(map[string]func(string, *tls.Conn) http.RoundTripper),
|
||
}
|
||
|
||
var sharedClient = &http.Client{
|
||
Transport: streamingTransport,
|
||
Timeout: 0,
|
||
}
|
||
|
||
type streamError struct {
|
||
Err error
|
||
StatusCode int
|
||
IsClientDisconnection bool
|
||
}
|
||
|
||
func (e *streamError) Error() string {
|
||
return e.Err.Error()
|
||
}
|
||
|
||
func (e *streamError) Unwrap() error {
|
||
return e.Err
|
||
}
|
||
|
||
type File struct {
|
||
name string
|
||
torrentName string
|
||
link string
|
||
downloadLink string
|
||
size int64
|
||
isDir bool
|
||
fileId string
|
||
isRar bool
|
||
metadataOnly bool
|
||
content []byte
|
||
children []os.FileInfo // For directories
|
||
cache *store.Cache
|
||
modTime time.Time
|
||
|
||
// Minimal state for interface compliance only
|
||
readOffset int64 // Only used for Read() method compliance
|
||
}
|
||
|
||
// File interface implementations for File
|
||
|
||
func (f *File) Close() error {
|
||
if f.isDir {
|
||
return nil // No resources to close for directories
|
||
}
|
||
|
||
// For files, we don't have any resources to close either
|
||
// This is just to satisfy the os.File interface
|
||
f.content = nil
|
||
f.children = nil
|
||
f.downloadLink = ""
|
||
f.readOffset = 0
|
||
return nil
|
||
}
|
||
|
||
func (f *File) getDownloadLink() (string, error) {
|
||
// Check if we already have a final URL cached
|
||
|
||
if f.downloadLink != "" && isValidURL(f.downloadLink) {
|
||
return f.downloadLink, nil
|
||
}
|
||
downloadLink, err := f.cache.GetDownloadLink(f.torrentName, f.name, f.link)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
if downloadLink != "" && isValidURL(downloadLink) {
|
||
f.downloadLink = downloadLink
|
||
return downloadLink, nil
|
||
}
|
||
return "", os.ErrNotExist
|
||
}
|
||
|
||
func (f *File) getDownloadByteRange() (*[2]int64, error) {
|
||
byteRange, err := f.cache.GetDownloadByteRange(f.torrentName, f.name)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return byteRange, nil
|
||
}
|
||
|
||
// setVideoStreamingHeaders sets the necessary headers for video streaming
|
||
// It returns error and a boolean indicating if the request is a range request
|
||
func (f *File) servePreloadedContent(w http.ResponseWriter, r *http.Request) error {
|
||
content := f.content
|
||
size := int64(len(content))
|
||
|
||
// Handle range requests for preloaded content
|
||
if rangeHeader := r.Header.Get("Range"); rangeHeader != "" {
|
||
ranges, err := parseRange(rangeHeader, size)
|
||
if err != nil || len(ranges) != 1 {
|
||
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", size))
|
||
return &streamError{Err: fmt.Errorf("invalid range"), StatusCode: http.StatusRequestedRangeNotSatisfiable}
|
||
}
|
||
|
||
start, end := ranges[0].start, ranges[0].end
|
||
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, size))
|
||
w.Header().Set("Content-Length", fmt.Sprintf("%d", end-start+1))
|
||
w.Header().Set("Accept-Ranges", "bytes")
|
||
w.WriteHeader(http.StatusPartialContent)
|
||
|
||
_, err = w.Write(content[start : end+1])
|
||
return err
|
||
}
|
||
|
||
// Full content
|
||
w.Header().Set("Content-Length", fmt.Sprintf("%d", size))
|
||
w.Header().Set("Accept-Ranges", "bytes")
|
||
w.WriteHeader(http.StatusOK)
|
||
|
||
_, err := w.Write(content)
|
||
return err
|
||
}
|
||
|
||
func (f *File) StreamResponse(w http.ResponseWriter, r *http.Request) error {
|
||
// Handle preloaded content files
|
||
if f.content != nil {
|
||
return f.servePreloadedContent(w, r)
|
||
}
|
||
|
||
// Try streaming with retry logic
|
||
return f.streamWithRetry(w, r, 0)
|
||
}
|
||
|
||
func (f *File) streamWithRetry(w http.ResponseWriter, r *http.Request, retryCount int) error {
|
||
const maxRetries = 3
|
||
_log := f.cache.Logger()
|
||
|
||
// Get download link (with caching optimization)
|
||
downloadLink, err := f.getDownloadLink()
|
||
if err != nil {
|
||
return &streamError{Err: err, StatusCode: http.StatusPreconditionFailed}
|
||
}
|
||
|
||
if downloadLink == "" {
|
||
return &streamError{Err: fmt.Errorf("empty download link"), StatusCode: http.StatusNotFound}
|
||
}
|
||
|
||
// Create upstream request with streaming optimizations
|
||
upstreamReq, err := http.NewRequest("GET", downloadLink, nil)
|
||
if err != nil {
|
||
return &streamError{Err: err, StatusCode: http.StatusInternalServerError}
|
||
}
|
||
|
||
setVideoStreamingHeaders(upstreamReq)
|
||
|
||
// Handle range requests (critical for video seeking)
|
||
isRangeRequest := f.handleRangeRequest(upstreamReq, r, w)
|
||
if isRangeRequest == -1 {
|
||
return &streamError{Err: fmt.Errorf("invalid range"), StatusCode: http.StatusRequestedRangeNotSatisfiable}
|
||
}
|
||
|
||
resp, err := sharedClient.Do(upstreamReq)
|
||
if err != nil {
|
||
return &streamError{Err: err, StatusCode: http.StatusServiceUnavailable}
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
// Handle upstream errors with retry logic
|
||
shouldRetry, retryErr := f.handleUpstream(resp, retryCount, maxRetries)
|
||
if shouldRetry && retryCount < maxRetries {
|
||
// Retry with new download link
|
||
_log.Debug().
|
||
Int("retry_count", retryCount+1).
|
||
Str("file", f.name).
|
||
Msg("Retrying stream request")
|
||
return f.streamWithRetry(w, r, retryCount+1)
|
||
}
|
||
if retryErr != nil {
|
||
return retryErr
|
||
}
|
||
|
||
// Determine status code based on range request
|
||
statusCode := http.StatusOK
|
||
if isRangeRequest == 1 {
|
||
statusCode = http.StatusPartialContent
|
||
}
|
||
|
||
// Set headers before streaming
|
||
if contentLength := resp.Header.Get("Content-Length"); contentLength != "" {
|
||
w.Header().Set("Content-Length", contentLength)
|
||
}
|
||
|
||
if contentRange := resp.Header.Get("Content-Range"); contentRange != "" && isRangeRequest == 1 {
|
||
w.Header().Set("Content-Range", contentRange)
|
||
}
|
||
|
||
if err := f.streamBuffer(w, resp.Body, statusCode); err != nil {
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (f *File) streamBuffer(w http.ResponseWriter, src io.Reader, statusCode int) error {
|
||
flusher, ok := w.(http.Flusher)
|
||
if !ok {
|
||
return fmt.Errorf("response does not support flushing")
|
||
}
|
||
|
||
smallBuf := make([]byte, 64*1024) // 64 KB
|
||
if n, err := src.Read(smallBuf); n > 0 {
|
||
// Write status code just before first successful write
|
||
w.WriteHeader(statusCode)
|
||
|
||
if _, werr := w.Write(smallBuf[:n]); werr != nil {
|
||
if isClientDisconnection(werr) {
|
||
return &streamError{Err: werr, StatusCode: 0, IsClientDisconnection: true}
|
||
}
|
||
// Headers already sent, can't send HTTP error response
|
||
return &streamError{Err: werr, StatusCode: 0, IsClientDisconnection: false}
|
||
}
|
||
flusher.Flush()
|
||
} else if err != nil && err != io.EOF {
|
||
return &streamError{Err: err, StatusCode: http.StatusInternalServerError}
|
||
}
|
||
|
||
buf := make([]byte, 256*1024) // 256 KB
|
||
for {
|
||
n, readErr := src.Read(buf)
|
||
if n > 0 {
|
||
if _, writeErr := w.Write(buf[:n]); writeErr != nil {
|
||
if isClientDisconnection(writeErr) {
|
||
return &streamError{Err: writeErr, StatusCode: 0, IsClientDisconnection: true}
|
||
}
|
||
// Headers already sent, can't send HTTP error response
|
||
return &streamError{Err: writeErr, StatusCode: 0, IsClientDisconnection: false}
|
||
}
|
||
flusher.Flush()
|
||
}
|
||
if readErr != nil {
|
||
if readErr == io.EOF {
|
||
return nil
|
||
}
|
||
if isClientDisconnection(readErr) {
|
||
return &streamError{Err: readErr, StatusCode: 0, IsClientDisconnection: true}
|
||
}
|
||
return readErr
|
||
}
|
||
}
|
||
}
|
||
|
||
func (f *File) handleUpstream(resp *http.Response, retryCount, maxRetries int) (shouldRetry bool, err error) {
|
||
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusPartialContent {
|
||
return false, nil
|
||
}
|
||
|
||
_log := f.cache.Logger()
|
||
|
||
// Clean up response body properly
|
||
cleanupResp := func(resp *http.Response) {
|
||
if resp.Body != nil {
|
||
_, _ = io.Copy(io.Discard, resp.Body)
|
||
resp.Body.Close()
|
||
}
|
||
}
|
||
|
||
switch resp.StatusCode {
|
||
case http.StatusServiceUnavailable:
|
||
// Read the body to check for specific error messages
|
||
body, readErr := io.ReadAll(resp.Body)
|
||
cleanupResp(resp)
|
||
|
||
if readErr != nil {
|
||
_log.Error().Err(readErr).Msg("Failed to read response body")
|
||
return false, &streamError{
|
||
Err: fmt.Errorf("failed to read error response: %w", readErr),
|
||
StatusCode: http.StatusServiceUnavailable,
|
||
}
|
||
}
|
||
|
||
bodyStr := string(body)
|
||
if strings.Contains(bodyStr, "you have exceeded your traffic") {
|
||
_log.Debug().
|
||
Str("file", f.name).
|
||
Int("retry_count", retryCount).
|
||
Msg("Bandwidth exceeded. Marking link as invalid")
|
||
|
||
f.cache.MarkDownloadLinkAsInvalid(f.link, f.downloadLink, "bandwidth_exceeded")
|
||
|
||
// Retry with a different API key if available and we haven't exceeded retries
|
||
if retryCount < maxRetries {
|
||
return true, nil
|
||
}
|
||
|
||
return false, &streamError{
|
||
Err: fmt.Errorf("bandwidth exceeded after %d retries", retryCount),
|
||
StatusCode: http.StatusServiceUnavailable,
|
||
}
|
||
}
|
||
|
||
return false, &streamError{
|
||
Err: fmt.Errorf("service unavailable: %s", bodyStr),
|
||
StatusCode: http.StatusServiceUnavailable,
|
||
}
|
||
|
||
case http.StatusNotFound:
|
||
cleanupResp(resp)
|
||
|
||
_log.Debug().
|
||
Str("file", f.name).
|
||
Int("retry_count", retryCount).
|
||
Msg("Link not found (404). Marking link as invalid and regenerating")
|
||
|
||
f.cache.MarkDownloadLinkAsInvalid(f.link, f.downloadLink, "link_not_found")
|
||
|
||
// Try to regenerate download link if we haven't exceeded retries
|
||
if retryCount < maxRetries {
|
||
// Clear cached link to force regeneration
|
||
f.downloadLink = ""
|
||
return true, nil
|
||
}
|
||
|
||
return false, &streamError{
|
||
Err: fmt.Errorf("file not found after %d retries", retryCount),
|
||
StatusCode: http.StatusNotFound,
|
||
}
|
||
|
||
default:
|
||
body, _ := io.ReadAll(resp.Body)
|
||
cleanupResp(resp)
|
||
|
||
_log.Error().
|
||
Int("status_code", resp.StatusCode).
|
||
Str("file", f.name).
|
||
Str("response_body", string(body)).
|
||
Msg("Unexpected upstream error")
|
||
|
||
return false, &streamError{
|
||
Err: fmt.Errorf("upstream error %d: %s", resp.StatusCode, string(body)),
|
||
StatusCode: http.StatusBadGateway,
|
||
}
|
||
}
|
||
}
|
||
|
||
func (f *File) handleRangeRequest(upstreamReq *http.Request, r *http.Request, w http.ResponseWriter) int {
|
||
rangeHeader := r.Header.Get("Range")
|
||
if rangeHeader == "" {
|
||
// For video files, apply byte range if exists
|
||
if byteRange, _ := f.getDownloadByteRange(); byteRange != nil {
|
||
upstreamReq.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", byteRange[0], byteRange[1]))
|
||
}
|
||
return 0 // No range request
|
||
}
|
||
|
||
// Parse range request
|
||
ranges, err := parseRange(rangeHeader, f.size)
|
||
if err != nil || len(ranges) != 1 {
|
||
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", f.size))
|
||
return -1 // Invalid range
|
||
}
|
||
|
||
// Apply byte range offset if exists
|
||
byteRange, _ := f.getDownloadByteRange()
|
||
start, end := ranges[0].start, ranges[0].end
|
||
|
||
if byteRange != nil {
|
||
start += byteRange[0]
|
||
end += byteRange[0]
|
||
}
|
||
|
||
upstreamReq.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
|
||
return 1 // Valid range request
|
||
}
|
||
|
||
/*
|
||
These are the methods that implement the os.File interface for the File type.
|
||
Only Stat and ReadDir are used
|
||
*/
|
||
|
||
func (f *File) Stat() (os.FileInfo, error) {
|
||
if f.isDir {
|
||
return &FileInfo{
|
||
name: f.name,
|
||
size: 0,
|
||
mode: 0755 | os.ModeDir,
|
||
modTime: f.modTime,
|
||
isDir: true,
|
||
}, nil
|
||
}
|
||
|
||
return &FileInfo{
|
||
name: f.name,
|
||
size: f.size,
|
||
mode: 0644,
|
||
modTime: f.modTime,
|
||
isDir: false,
|
||
}, nil
|
||
}
|
||
|
||
func (f *File) Read(p []byte) (n int, err error) {
|
||
if f.isDir {
|
||
return 0, os.ErrInvalid
|
||
}
|
||
|
||
if f.metadataOnly {
|
||
return 0, io.EOF
|
||
}
|
||
|
||
// For preloaded content files (like version.txt)
|
||
if f.content != nil {
|
||
if f.readOffset >= int64(len(f.content)) {
|
||
return 0, io.EOF
|
||
}
|
||
n = copy(p, f.content[f.readOffset:])
|
||
f.readOffset += int64(n)
|
||
return n, nil
|
||
}
|
||
|
||
// For streaming files, return an error to force use of StreamResponse
|
||
return 0, fmt.Errorf("use StreamResponse method for streaming files")
|
||
}
|
||
|
||
func (f *File) Seek(offset int64, whence int) (int64, error) {
|
||
if f.isDir {
|
||
return 0, os.ErrInvalid
|
||
}
|
||
|
||
// Only handle seeking for preloaded content
|
||
if f.content != nil {
|
||
newOffset := f.readOffset
|
||
switch whence {
|
||
case io.SeekStart:
|
||
newOffset = offset
|
||
case io.SeekCurrent:
|
||
newOffset += offset
|
||
case io.SeekEnd:
|
||
newOffset = int64(len(f.content)) + offset
|
||
default:
|
||
return 0, os.ErrInvalid
|
||
}
|
||
|
||
if newOffset < 0 {
|
||
newOffset = 0
|
||
}
|
||
if newOffset > int64(len(f.content)) {
|
||
newOffset = int64(len(f.content))
|
||
}
|
||
|
||
f.readOffset = newOffset
|
||
return f.readOffset, nil
|
||
}
|
||
|
||
// For streaming files, return error to force use of StreamResponse
|
||
return 0, fmt.Errorf("use StreamResponse method for streaming files")
|
||
}
|
||
|
||
func (f *File) Write(p []byte) (n int, err error) {
|
||
return 0, os.ErrPermission
|
||
}
|
||
|
||
func (f *File) Readdir(count int) ([]os.FileInfo, error) {
|
||
if !f.isDir {
|
||
return nil, os.ErrInvalid
|
||
}
|
||
|
||
if count <= 0 {
|
||
return f.children, nil
|
||
}
|
||
|
||
if len(f.children) == 0 {
|
||
return nil, io.EOF
|
||
}
|
||
|
||
if count > len(f.children) {
|
||
count = len(f.children)
|
||
}
|
||
|
||
files := f.children[:count]
|
||
f.children = f.children[count:]
|
||
return files, nil
|
||
}
|