478 lines
12 KiB
Go
478 lines
12 KiB
Go
package webdav
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/sirrobot01/decypharr/pkg/debrid/store"
|
|
)
|
|
|
|
type streamError struct {
|
|
Err error
|
|
StatusCode int
|
|
IsClientDisconnection bool
|
|
}
|
|
|
|
func (e *streamError) Error() string {
|
|
return e.Err.Error()
|
|
}
|
|
|
|
func (e *streamError) Unwrap() error {
|
|
return e.Err
|
|
}
|
|
|
|
type File struct {
|
|
name string
|
|
torrentName string
|
|
link string
|
|
downloadLink string
|
|
size int64
|
|
isDir bool
|
|
fileId string
|
|
isRar bool
|
|
metadataOnly bool
|
|
content []byte
|
|
children []os.FileInfo // For directories
|
|
cache *store.Cache
|
|
modTime time.Time
|
|
|
|
// Minimal state for interface compliance only
|
|
readOffset int64 // Only used for Read() method compliance
|
|
}
|
|
|
|
// File interface implementations for File
|
|
|
|
func (f *File) Close() error {
|
|
if f.isDir {
|
|
return nil // No resources to close for directories
|
|
}
|
|
|
|
// For files, we don't have any resources to close either
|
|
// This is just to satisfy the os.File interface
|
|
f.content = nil
|
|
f.children = nil
|
|
f.downloadLink = ""
|
|
f.readOffset = 0
|
|
return nil
|
|
}
|
|
|
|
func (f *File) getDownloadLink() (string, error) {
|
|
// Check if we already have a final URL cached
|
|
|
|
if f.downloadLink != "" && isValidURL(f.downloadLink) {
|
|
return f.downloadLink, nil
|
|
}
|
|
downloadLink, err := f.cache.GetDownloadLink(f.torrentName, f.name, f.link)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if downloadLink != "" && isValidURL(downloadLink) {
|
|
f.downloadLink = downloadLink
|
|
return downloadLink, nil
|
|
}
|
|
return "", os.ErrNotExist
|
|
}
|
|
|
|
func (f *File) getDownloadByteRange() (*[2]int64, error) {
|
|
byteRange, err := f.cache.GetDownloadByteRange(f.torrentName, f.name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return byteRange, nil
|
|
}
|
|
|
|
// setVideoStreamingHeaders sets the necessary headers for video streaming
|
|
// It returns error and a boolean indicating if the request is a range request
|
|
func (f *File) servePreloadedContent(w http.ResponseWriter, r *http.Request) error {
|
|
content := f.content
|
|
size := int64(len(content))
|
|
|
|
// Handle range requests for preloaded content
|
|
if rangeHeader := r.Header.Get("Range"); rangeHeader != "" {
|
|
ranges, err := parseRange(rangeHeader, size)
|
|
if err != nil || len(ranges) != 1 {
|
|
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", size))
|
|
return &streamError{Err: fmt.Errorf("invalid range"), StatusCode: http.StatusRequestedRangeNotSatisfiable}
|
|
}
|
|
|
|
start, end := ranges[0].start, ranges[0].end
|
|
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, size))
|
|
w.Header().Set("Content-Length", fmt.Sprintf("%d", end-start+1))
|
|
w.Header().Set("Accept-Ranges", "bytes")
|
|
w.WriteHeader(http.StatusPartialContent)
|
|
|
|
_, err = w.Write(content[start : end+1])
|
|
return err
|
|
}
|
|
|
|
// Full content
|
|
w.Header().Set("Content-Length", fmt.Sprintf("%d", size))
|
|
w.Header().Set("Accept-Ranges", "bytes")
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
_, err := w.Write(content)
|
|
return err
|
|
}
|
|
|
|
func (f *File) StreamResponse(w http.ResponseWriter, r *http.Request) error {
|
|
// Handle preloaded content files
|
|
if f.content != nil {
|
|
return f.servePreloadedContent(w, r)
|
|
}
|
|
|
|
// Try streaming with retry logic
|
|
return f.streamWithRetry(w, r, 0)
|
|
}
|
|
|
|
func (f *File) streamWithRetry(w http.ResponseWriter, r *http.Request, retryCount int) error {
|
|
const maxRetries = 3
|
|
_log := f.cache.Logger()
|
|
|
|
// Get download link (with caching optimization)
|
|
downloadLink, err := f.getDownloadLink()
|
|
if err != nil {
|
|
return &streamError{Err: err, StatusCode: http.StatusPreconditionFailed}
|
|
}
|
|
|
|
if downloadLink == "" {
|
|
return &streamError{Err: fmt.Errorf("empty download link"), StatusCode: http.StatusNotFound}
|
|
}
|
|
|
|
// Create upstream request with streaming optimizations
|
|
upstreamReq, err := http.NewRequest("GET", downloadLink, nil)
|
|
if err != nil {
|
|
return &streamError{Err: err, StatusCode: http.StatusInternalServerError}
|
|
}
|
|
|
|
setVideoStreamingHeaders(upstreamReq)
|
|
|
|
// Handle range requests (critical for video seeking)
|
|
isRangeRequest := f.handleRangeRequest(upstreamReq, r, w)
|
|
if isRangeRequest == -1 {
|
|
return &streamError{Err: fmt.Errorf("invalid range"), StatusCode: http.StatusRequestedRangeNotSatisfiable}
|
|
}
|
|
|
|
resp, err := f.cache.Download(upstreamReq)
|
|
if err != nil {
|
|
return &streamError{Err: err, StatusCode: http.StatusServiceUnavailable}
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// Handle upstream errors with retry logic
|
|
shouldRetry, retryErr := f.handleUpstream(resp, retryCount, maxRetries)
|
|
if shouldRetry && retryCount < maxRetries {
|
|
// Retry with new download link
|
|
_log.Debug().
|
|
Int("retry_count", retryCount+1).
|
|
Str("file", f.name).
|
|
Msg("Retrying stream request")
|
|
return f.streamWithRetry(w, r, retryCount+1)
|
|
}
|
|
if retryErr != nil {
|
|
return retryErr
|
|
}
|
|
|
|
// Determine status code based on range request
|
|
statusCode := http.StatusOK
|
|
if isRangeRequest == 1 {
|
|
statusCode = http.StatusPartialContent
|
|
}
|
|
|
|
// Set headers before streaming
|
|
if contentLength := resp.Header.Get("Content-Length"); contentLength != "" {
|
|
w.Header().Set("Content-Length", contentLength)
|
|
}
|
|
|
|
if contentRange := resp.Header.Get("Content-Range"); contentRange != "" && isRangeRequest == 1 {
|
|
w.Header().Set("Content-Range", contentRange)
|
|
}
|
|
|
|
if err := f.streamBuffer(w, resp.Body, statusCode); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *File) streamBuffer(w http.ResponseWriter, src io.Reader, statusCode int) error {
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
return fmt.Errorf("response does not support flushing")
|
|
}
|
|
|
|
smallBuf := make([]byte, 64*1024) // 64 KB
|
|
if n, err := src.Read(smallBuf); n > 0 {
|
|
// Write status code just before first successful write
|
|
w.WriteHeader(statusCode)
|
|
|
|
if _, werr := w.Write(smallBuf[:n]); werr != nil {
|
|
if isClientDisconnection(werr) {
|
|
return &streamError{Err: werr, StatusCode: 0, IsClientDisconnection: true}
|
|
}
|
|
// Headers already sent, can't send HTTP error response
|
|
return &streamError{Err: werr, StatusCode: 0, IsClientDisconnection: false}
|
|
}
|
|
flusher.Flush()
|
|
} else if err != nil && err != io.EOF {
|
|
return &streamError{Err: err, StatusCode: http.StatusInternalServerError}
|
|
}
|
|
|
|
buf := make([]byte, 256*1024) // 256 KB
|
|
for {
|
|
n, readErr := src.Read(buf)
|
|
if n > 0 {
|
|
if _, writeErr := w.Write(buf[:n]); writeErr != nil {
|
|
if isClientDisconnection(writeErr) {
|
|
return &streamError{Err: writeErr, StatusCode: 0, IsClientDisconnection: true}
|
|
}
|
|
// Headers already sent, can't send HTTP error response
|
|
return &streamError{Err: writeErr, StatusCode: 0, IsClientDisconnection: false}
|
|
}
|
|
flusher.Flush()
|
|
}
|
|
if readErr != nil {
|
|
if readErr == io.EOF {
|
|
return nil
|
|
}
|
|
if isClientDisconnection(readErr) {
|
|
return &streamError{Err: readErr, StatusCode: 0, IsClientDisconnection: true}
|
|
}
|
|
return readErr
|
|
}
|
|
}
|
|
}
|
|
|
|
func (f *File) handleUpstream(resp *http.Response, retryCount, maxRetries int) (shouldRetry bool, err error) {
|
|
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusPartialContent {
|
|
return false, nil
|
|
}
|
|
|
|
_log := f.cache.Logger()
|
|
|
|
// Clean up response body properly
|
|
cleanupResp := func(resp *http.Response) {
|
|
if resp.Body != nil {
|
|
_, _ = io.Copy(io.Discard, resp.Body)
|
|
resp.Body.Close()
|
|
}
|
|
}
|
|
|
|
switch resp.StatusCode {
|
|
case http.StatusServiceUnavailable:
|
|
// Read the body to check for specific error messages
|
|
body, readErr := io.ReadAll(resp.Body)
|
|
cleanupResp(resp)
|
|
|
|
if readErr != nil {
|
|
_log.Error().Err(readErr).Msg("Failed to read response body")
|
|
return false, &streamError{
|
|
Err: fmt.Errorf("failed to read error response: %w", readErr),
|
|
StatusCode: http.StatusServiceUnavailable,
|
|
}
|
|
}
|
|
|
|
bodyStr := string(body)
|
|
if strings.Contains(bodyStr, "you have exceeded your traffic") {
|
|
_log.Debug().
|
|
Str("file", f.name).
|
|
Int("retry_count", retryCount).
|
|
Msg("Bandwidth exceeded. Marking link as invalid")
|
|
|
|
f.cache.MarkDownloadLinkAsInvalid(f.link, f.downloadLink, "bandwidth_exceeded")
|
|
|
|
// Retry with a different API key if available and we haven't exceeded retries
|
|
if retryCount < maxRetries {
|
|
return true, nil
|
|
}
|
|
|
|
return false, &streamError{
|
|
Err: fmt.Errorf("bandwidth exceeded after %d retries", retryCount),
|
|
StatusCode: http.StatusServiceUnavailable,
|
|
}
|
|
}
|
|
|
|
return false, &streamError{
|
|
Err: fmt.Errorf("service unavailable: %s", bodyStr),
|
|
StatusCode: http.StatusServiceUnavailable,
|
|
}
|
|
|
|
case http.StatusNotFound:
|
|
cleanupResp(resp)
|
|
|
|
_log.Debug().
|
|
Str("file", f.name).
|
|
Int("retry_count", retryCount).
|
|
Msg("Link not found (404). Marking link as invalid and regenerating")
|
|
|
|
f.cache.MarkDownloadLinkAsInvalid(f.link, f.downloadLink, "link_not_found")
|
|
|
|
// Try to regenerate download link if we haven't exceeded retries
|
|
if retryCount < maxRetries {
|
|
// Clear cached link to force regeneration
|
|
f.downloadLink = ""
|
|
return true, nil
|
|
}
|
|
|
|
return false, &streamError{
|
|
Err: fmt.Errorf("file not found after %d retries", retryCount),
|
|
StatusCode: http.StatusNotFound,
|
|
}
|
|
|
|
default:
|
|
body, _ := io.ReadAll(resp.Body)
|
|
cleanupResp(resp)
|
|
|
|
_log.Error().
|
|
Int("status_code", resp.StatusCode).
|
|
Str("file", f.name).
|
|
Str("response_body", string(body)).
|
|
Msg("Unexpected upstream error")
|
|
|
|
return false, &streamError{
|
|
Err: fmt.Errorf("upstream error %d: %s", resp.StatusCode, string(body)),
|
|
StatusCode: http.StatusBadGateway,
|
|
}
|
|
}
|
|
}
|
|
|
|
func (f *File) handleRangeRequest(upstreamReq *http.Request, r *http.Request, w http.ResponseWriter) int {
|
|
rangeHeader := r.Header.Get("Range")
|
|
if rangeHeader == "" {
|
|
// For video files, apply byte range if exists
|
|
if byteRange, _ := f.getDownloadByteRange(); byteRange != nil {
|
|
upstreamReq.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", byteRange[0], byteRange[1]))
|
|
}
|
|
return 0 // No range request
|
|
}
|
|
|
|
// Parse range request
|
|
ranges, err := parseRange(rangeHeader, f.size)
|
|
if err != nil || len(ranges) != 1 {
|
|
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", f.size))
|
|
return -1 // Invalid range
|
|
}
|
|
|
|
// Apply byte range offset if exists
|
|
byteRange, _ := f.getDownloadByteRange()
|
|
start, end := ranges[0].start, ranges[0].end
|
|
|
|
if byteRange != nil {
|
|
start += byteRange[0]
|
|
end += byteRange[0]
|
|
}
|
|
|
|
upstreamReq.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
|
|
return 1 // Valid range request
|
|
}
|
|
|
|
/*
|
|
These are the methods that implement the os.File interface for the File type.
|
|
Only Stat and ReadDir are used
|
|
*/
|
|
|
|
func (f *File) Stat() (os.FileInfo, error) {
|
|
if f.isDir {
|
|
return &FileInfo{
|
|
name: f.name,
|
|
size: 0,
|
|
mode: 0755 | os.ModeDir,
|
|
modTime: f.modTime,
|
|
isDir: true,
|
|
}, nil
|
|
}
|
|
|
|
return &FileInfo{
|
|
name: f.name,
|
|
size: f.size,
|
|
mode: 0644,
|
|
modTime: f.modTime,
|
|
isDir: false,
|
|
}, nil
|
|
}
|
|
|
|
func (f *File) Read(p []byte) (n int, err error) {
|
|
if f.isDir {
|
|
return 0, os.ErrInvalid
|
|
}
|
|
|
|
if f.metadataOnly {
|
|
return 0, io.EOF
|
|
}
|
|
|
|
// For preloaded content files (like version.txt)
|
|
if f.content != nil {
|
|
if f.readOffset >= int64(len(f.content)) {
|
|
return 0, io.EOF
|
|
}
|
|
n = copy(p, f.content[f.readOffset:])
|
|
f.readOffset += int64(n)
|
|
return n, nil
|
|
}
|
|
|
|
// For streaming files, return an error to force use of StreamResponse
|
|
return 0, fmt.Errorf("use StreamResponse method for streaming files")
|
|
}
|
|
|
|
func (f *File) Seek(offset int64, whence int) (int64, error) {
|
|
if f.isDir {
|
|
return 0, os.ErrInvalid
|
|
}
|
|
|
|
// Only handle seeking for preloaded content
|
|
if f.content != nil {
|
|
newOffset := f.readOffset
|
|
switch whence {
|
|
case io.SeekStart:
|
|
newOffset = offset
|
|
case io.SeekCurrent:
|
|
newOffset += offset
|
|
case io.SeekEnd:
|
|
newOffset = int64(len(f.content)) + offset
|
|
default:
|
|
return 0, os.ErrInvalid
|
|
}
|
|
|
|
if newOffset < 0 {
|
|
newOffset = 0
|
|
}
|
|
if newOffset > int64(len(f.content)) {
|
|
newOffset = int64(len(f.content))
|
|
}
|
|
|
|
f.readOffset = newOffset
|
|
return f.readOffset, nil
|
|
}
|
|
|
|
// For streaming files, return error to force use of StreamResponse
|
|
return 0, fmt.Errorf("use StreamResponse method for streaming files")
|
|
}
|
|
|
|
func (f *File) Write(p []byte) (n int, err error) {
|
|
return 0, os.ErrPermission
|
|
}
|
|
|
|
func (f *File) Readdir(count int) ([]os.FileInfo, error) {
|
|
if !f.isDir {
|
|
return nil, os.ErrInvalid
|
|
}
|
|
|
|
if count <= 0 {
|
|
return f.children, nil
|
|
}
|
|
|
|
if len(f.children) == 0 {
|
|
return nil, io.EOF
|
|
}
|
|
|
|
if count > len(f.children) {
|
|
count = len(f.children)
|
|
}
|
|
|
|
files := f.children[:count]
|
|
f.children = f.children[count:]
|
|
return files, nil
|
|
}
|