Fix issues with repair, move to a different streaming option

This commit is contained in:
Mukhtar Akere
2025-06-18 10:42:44 +01:00
parent 5661b05ec1
commit b2e99585f7
6 changed files with 507 additions and 275 deletions

View File

@@ -27,35 +27,52 @@ var sharedClient = &http.Client{
Timeout: 0,
}
type streamError struct {
Err error
StatusCode int
IsClientDisconnection bool
}
func (e *streamError) Error() string {
return e.Err.Error()
}
func (e *streamError) Unwrap() error {
return e.Err
}
type File struct {
cache *store.Cache
fileId string
torrentName string
modTime time.Time
size int64
offset int64
isDir bool
children []os.FileInfo
reader io.ReadCloser
seekPending bool
content []byte
isRar bool
name string
metadataOnly bool
downloadLink string
torrentName string
link string
downloadLink string
size int64
isDir bool
fileId string
isRar bool
metadataOnly bool
content []byte
children []os.FileInfo // For directories
cache *store.Cache
modTime time.Time
// Minimal state for interface compliance only
readOffset int64 // Only used for Read() method compliance
}
// File interface implementations for File
func (f *File) Close() error {
if f.reader != nil {
f.reader.Close()
f.reader = nil
if f.isDir {
return nil // No resources to close for directories
}
// For files, we don't have any resources to close either
// This is just to satisfy the os.File interface
f.content = nil
f.children = nil
f.downloadLink = ""
f.readOffset = 0
return nil
}
@@ -84,211 +101,274 @@ func (f *File) getDownloadByteRange() (*[2]int64, error) {
return byteRange, nil
}
func (f *File) stream() (*http.Response, error) {
client := sharedClient
func (f *File) servePreloadedContent(w http.ResponseWriter, r *http.Request) error {
content := f.content
size := int64(len(content))
// Handle range requests for preloaded content
if rangeHeader := r.Header.Get("Range"); rangeHeader != "" {
ranges, err := parseRange(rangeHeader, size)
if err != nil || len(ranges) != 1 {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", size))
return &streamError{Err: fmt.Errorf("invalid range"), StatusCode: http.StatusRequestedRangeNotSatisfiable}
}
start, end := ranges[0].start, ranges[0].end
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, size))
w.Header().Set("Content-Length", fmt.Sprintf("%d", end-start+1))
w.Header().Set("Accept-Ranges", "bytes")
w.WriteHeader(http.StatusPartialContent)
_, err = w.Write(content[start : end+1])
return err
}
// Full content
w.Header().Set("Content-Length", fmt.Sprintf("%d", size))
w.Header().Set("Accept-Ranges", "bytes")
w.WriteHeader(http.StatusOK)
_, err := w.Write(content)
return err
}
func (f *File) StreamResponse(w http.ResponseWriter, r *http.Request) error {
// Handle preloaded content files
if f.content != nil {
return f.servePreloadedContent(w, r)
}
// Try streaming with retry logic
return f.streamWithRetry(w, r, 0)
}
func (f *File) streamWithRetry(w http.ResponseWriter, r *http.Request, retryCount int) error {
const maxRetries = 0
_log := f.cache.Logger()
// Get download link (with caching optimization)
downloadLink, err := f.getDownloadLink()
if err != nil {
_log.Trace().Msgf("Failed to get download link for %s: %v", f.name, err)
return nil, err
return &streamError{Err: err, StatusCode: http.StatusPreconditionFailed}
}
if downloadLink == "" {
_log.Trace().Msgf("Failed to get download link for %s. Empty download link", f.name)
return nil, fmt.Errorf("empty download link")
return &streamError{Err: fmt.Errorf("empty download link"), StatusCode: http.StatusNotFound}
}
byteRange, err := f.getDownloadByteRange()
// Create upstream request with streaming optimizations
upstreamReq, err := http.NewRequest("GET", downloadLink, nil)
if err != nil {
_log.Trace().Msgf("Failed to get download byte range for %s: %v", f.name, err)
return nil, err
return &streamError{Err: err, StatusCode: http.StatusInternalServerError}
}
req, err := http.NewRequest("GET", downloadLink, nil)
setVideoStreamingHeaders(upstreamReq)
// Handle range requests (critical for video seeking)
isRangeRequest := f.handleRangeRequest(upstreamReq, r, w)
if isRangeRequest == -1 {
return &streamError{Err: fmt.Errorf("invalid range"), StatusCode: http.StatusRequestedRangeNotSatisfiable}
}
resp, err := sharedClient.Do(upstreamReq)
if err != nil {
_log.Trace().Msgf("Failed to create HTTP request: %v", err)
return nil, err
return &streamError{Err: err, StatusCode: http.StatusServiceUnavailable}
}
defer resp.Body.Close()
// Handle upstream errors with retry logic
shouldRetry, retryErr := f.handleUpstream(resp, retryCount, maxRetries)
if shouldRetry && retryCount < maxRetries {
// Retry with new download link
_log.Debug().
Int("retry_count", retryCount+1).
Str("file", f.name).
Msg("Retrying stream request")
return f.streamWithRetry(w, r, retryCount+1)
}
if retryErr != nil {
return retryErr
}
if byteRange == nil {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", max(0, f.offset)))
} else {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", byteRange[0]+max(0, f.offset)))
}
setVideoResponseHeaders(w, resp, isRangeRequest == 1)
// Make the request
resp, err := client.Do(req)
if err != nil {
_log.Trace().Msgf("HTTP request failed: %v", err)
return nil, err
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
f.downloadLink = ""
cleanupResp := func(resp *http.Response) {
if resp.Body != nil {
_, _ = io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}
}
switch resp.StatusCode {
case http.StatusServiceUnavailable:
// Read the body to check for specific error messages
body, readErr := io.ReadAll(resp.Body)
cleanupResp(resp)
if readErr != nil {
_log.Trace().Msgf("Failed to read response body: %v", readErr)
return nil, fmt.Errorf("failed to read error response: %w", readErr)
}
bodyStr := string(body)
if strings.Contains(bodyStr, "You can not download this file because you have exceeded your traffic on this hoster") {
_log.Trace().Msgf("Bandwidth exceeded for %s. Download token will be disabled if you have more than one", f.name)
f.cache.MarkDownloadLinkAsInvalid(f.link, downloadLink, "bandwidth_exceeded")
// Retry with a different API key if it's available
return f.stream()
}
return nil, fmt.Errorf("service unavailable: %s", bodyStr)
case http.StatusNotFound:
cleanupResp(resp)
// Mark download link as not found
// Regenerate a new download link
_log.Trace().Msgf("Link not found (404) for %s. Marking link as invalid and regenerating", f.name)
f.cache.MarkDownloadLinkAsInvalid(f.link, downloadLink, "link_not_found")
// Generate a new download link
downloadLink, err := f.getDownloadLink()
if err != nil {
_log.Trace().Msgf("Failed to get download link for %s. %s", f.name, err)
return nil, err
}
if downloadLink == "" {
_log.Trace().Msgf("Failed to get download link for %s", f.name)
return nil, fmt.Errorf("failed to regenerate download link")
}
req, err := http.NewRequest("GET", downloadLink, nil)
if err != nil {
return nil, err
}
// Set the range header again
if byteRange == nil {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", max(0, f.offset)))
} else {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", byteRange[0]+max(0, f.offset)))
}
newResp, err := client.Do(req)
if err != nil {
return nil, err
}
if newResp.StatusCode != http.StatusOK && newResp.StatusCode != http.StatusPartialContent {
cleanupResp(newResp)
_log.Trace().Msgf("Regenerated link also failed with status %d", newResp.StatusCode)
f.cache.MarkDownloadLinkAsInvalid(f.link, downloadLink, newResp.Status)
return nil, fmt.Errorf("failed with status code %d even after link regeneration", newResp.StatusCode)
}
return newResp, nil
default:
body, _ := io.ReadAll(resp.Body)
cleanupResp(resp)
_log.Trace().Msgf("Unexpected status code %d for %s: %s", resp.StatusCode, f.name, string(body))
return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body))
}
}
return resp, nil
// Stream with optimized buffering for video
return f.streamVideoOptimized(w, resp.Body)
}
func (f *File) Read(p []byte) (n int, err error) {
if f.isDir {
return 0, os.ErrInvalid
}
if f.metadataOnly {
return 0, io.EOF
}
if f.content != nil {
if f.offset >= int64(len(f.content)) {
return 0, io.EOF
}
n = copy(p, f.content[f.offset:])
f.offset += int64(n)
return n, nil
func (f *File) handleUpstream(resp *http.Response, retryCount, maxRetries int) (shouldRetry bool, err error) {
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusPartialContent {
return false, nil
}
// If we haven't started streaming the file yet or need to reposition
if f.reader == nil || f.seekPending {
if f.reader != nil {
f.reader.Close()
f.reader = nil
_log := f.cache.Logger()
// Clean up response body properly
cleanupResp := func(resp *http.Response) {
if resp.Body != nil {
_, _ = io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}
}
switch resp.StatusCode {
case http.StatusServiceUnavailable:
// Read the body to check for specific error messages
body, readErr := io.ReadAll(resp.Body)
cleanupResp(resp)
if readErr != nil {
_log.Error().Err(readErr).Msg("Failed to read response body")
return false, &streamError{
Err: fmt.Errorf("failed to read error response: %w", readErr),
StatusCode: http.StatusServiceUnavailable,
}
}
// Make the request to get the file
resp, err := f.stream()
if err != nil {
return 0, err
}
if resp == nil {
return 0, fmt.Errorf("stream returned nil response")
bodyStr := string(body)
if strings.Contains(bodyStr, "you have exceeded your traffic") {
_log.Debug().
Str("file", f.name).
Int("retry_count", retryCount).
Msg("Bandwidth exceeded. Marking link as invalid")
f.cache.MarkDownloadLinkAsInvalid(f.link, f.downloadLink, "bandwidth_exceeded")
// Retry with a different API key if available and we haven't exceeded retries
if retryCount < maxRetries {
return true, nil
}
return false, &streamError{
Err: fmt.Errorf("bandwidth exceeded after %d retries", retryCount),
StatusCode: http.StatusServiceUnavailable,
}
}
f.reader = resp.Body
f.seekPending = false
}
return false, &streamError{
Err: fmt.Errorf("service unavailable: %s", bodyStr),
StatusCode: http.StatusServiceUnavailable,
}
n, err = f.reader.Read(p)
f.offset += int64(n)
case http.StatusNotFound:
cleanupResp(resp)
if err != nil {
f.reader.Close()
f.reader = nil
}
_log.Debug().
Str("file", f.name).
Int("retry_count", retryCount).
Msg("Link not found (404). Marking link as invalid and regenerating")
return n, err
}
f.cache.MarkDownloadLinkAsInvalid(f.link, f.downloadLink, "link_not_found")
func (f *File) Seek(offset int64, whence int) (int64, error) {
if f.isDir {
return 0, os.ErrInvalid
}
// Try to regenerate download link if we haven't exceeded retries
if retryCount < maxRetries {
// Clear cached link to force regeneration
f.downloadLink = ""
return true, nil
}
return false, &streamError{
Err: fmt.Errorf("file not found after %d retries", retryCount),
StatusCode: http.StatusNotFound,
}
newOffset := f.offset
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset += offset
case io.SeekEnd:
newOffset = f.size + offset
default:
return 0, os.ErrInvalid
}
body, _ := io.ReadAll(resp.Body)
cleanupResp(resp)
if newOffset < 0 {
newOffset = 0
}
if newOffset > f.size {
newOffset = f.size
}
_log.Error().
Int("status_code", resp.StatusCode).
Str("file", f.name).
Str("response_body", string(body)).
Msg("Unexpected upstream error")
// Only mark seek as pending if position actually changed
if newOffset != f.offset {
f.offset = newOffset
f.seekPending = true
return false, &streamError{
Err: fmt.Errorf("upstream error %d: %s", resp.StatusCode, string(body)),
StatusCode: http.StatusBadGateway,
}
}
return f.offset, nil
}
func (f *File) handleRangeRequest(upstreamReq *http.Request, r *http.Request, w http.ResponseWriter) int {
rangeHeader := r.Header.Get("Range")
if rangeHeader == "" {
// For video files, apply byte range if exists
if byteRange, _ := f.getDownloadByteRange(); byteRange != nil {
upstreamReq.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", byteRange[0], byteRange[1]))
}
return 0 // No range request
}
// Parse range request
ranges, err := parseRange(rangeHeader, f.size)
if err != nil || len(ranges) != 1 {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", f.size))
return -1 // Invalid range
}
// Apply byte range offset if exists
byteRange, _ := f.getDownloadByteRange()
start, end := ranges[0].start, ranges[0].end
if byteRange != nil {
start += byteRange[0]
end += byteRange[0]
}
upstreamReq.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
return 1 // Valid range request
}
func (f *File) streamVideoOptimized(w http.ResponseWriter, src io.Reader) error {
// Use larger buffer for video streaming (better throughput)
buf := make([]byte, 64*1024) // 64KB buffer
// First chunk optimization - send immediately for faster start
n, err := src.Read(buf)
if err != nil && err != io.EOF {
if isClientDisconnection(err) {
return &streamError{Err: err, StatusCode: 0, IsClientDisconnection: true}
}
return &streamError{Err: err, StatusCode: 0}
}
if n > 0 {
// Write first chunk immediately
_, writeErr := w.Write(buf[:n])
if writeErr != nil {
if isClientDisconnection(writeErr) {
return &streamError{Err: writeErr, StatusCode: 0, IsClientDisconnection: true}
}
return &streamError{Err: writeErr, StatusCode: 0}
}
// Flush immediately for faster video start
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
if err == io.EOF {
return nil
}
// Continue with optimized copy for remaining data
_, err = io.CopyBuffer(w, src, buf)
if err != nil {
if isClientDisconnection(err) {
return &streamError{Err: err, StatusCode: 0, IsClientDisconnection: true}
}
return &streamError{Err: err, StatusCode: 0}
}
return nil
}
/*
These are the methods that implement the os.File interface for the File type.
Only Stat and ReadDir are used
*/
func (f *File) Stat() (os.FileInfo, error) {
if f.isDir {
return &FileInfo{
@@ -309,18 +389,61 @@ func (f *File) Stat() (os.FileInfo, error) {
}, nil
}
func (f *File) ReadAt(p []byte, off int64) (n int, err error) {
// Save current position
// Seek to requested position
_, err = f.Seek(off, io.SeekStart)
if err != nil {
return 0, err
func (f *File) Read(p []byte) (n int, err error) {
if f.isDir {
return 0, os.ErrInvalid
}
// Read the data
n, err = f.Read(p)
return n, err
if f.metadataOnly {
return 0, io.EOF
}
// For preloaded content files (like version.txt)
if f.content != nil {
if f.readOffset >= int64(len(f.content)) {
return 0, io.EOF
}
n = copy(p, f.content[f.readOffset:])
f.readOffset += int64(n)
return n, nil
}
// For streaming files, return an error to force use of StreamResponse
return 0, fmt.Errorf("use StreamResponse method for streaming files")
}
func (f *File) Seek(offset int64, whence int) (int64, error) {
if f.isDir {
return 0, os.ErrInvalid
}
// Only handle seeking for preloaded content
if f.content != nil {
newOffset := f.readOffset
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset += offset
case io.SeekEnd:
newOffset = int64(len(f.content)) + offset
default:
return 0, os.ErrInvalid
}
if newOffset < 0 {
newOffset = 0
}
if newOffset > int64(len(f.content)) {
newOffset = int64(len(f.content))
}
f.readOffset = newOffset
return f.readOffset, nil
}
// For streaming files, return error to force use of StreamResponse
return 0, fmt.Errorf("use StreamResponse method for streaming files")
}
func (f *File) Write(p []byte) (n int, err error) {