experimental

This commit is contained in:
Mukhtar Akere
2025-03-15 21:08:15 +01:00
parent b4e4db27fb
commit 2d29996d2c
11 changed files with 281 additions and 127 deletions

View File

@@ -1,15 +1,17 @@
package cache
import (
"bufio"
"context"
"encoding/json"
"fmt"
"github.com/dgraph-io/badger/v4"
"github.com/rs/zerolog"
"github.com/sirrobot01/debrid-blackhole/internal/logger"
"os"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/sirrobot01/debrid-blackhole/internal/config"
@@ -35,17 +37,45 @@ var (
func getLogger() zerolog.Logger {
once.Do(func() {
_logInstance = logger.NewLogger("cache", "info", os.Stdout)
cfg := config.GetConfig()
_logInstance = logger.NewLogger("cache", cfg.LogLevel, os.Stdout)
})
return _logInstance
}
type Cache struct {
dir string
client engine.Service
torrents *sync.Map // key: torrent.Id, value: *CachedTorrent
torrentsNames *sync.Map // key: torrent.Name, value: torrent.Id
LastUpdated time.Time `json:"last_updated"`
dir string
client engine.Service
db *badger.DB
torrents map[string]*CachedTorrent // key: torrent.Id, value: *CachedTorrent
torrentsMutex sync.RWMutex
torrentsNames map[string]*CachedTorrent // key: torrent.Name, value: torrent
torrentNamesMutex sync.RWMutex
LastUpdated time.Time `json:"last_updated"`
}
func (c *Cache) SetTorrent(t *CachedTorrent) {
c.torrentsMutex.Lock()
defer c.torrentsMutex.Unlock()
c.torrents[t.Id] = t
}
func (c *Cache) SetTorrentName(name string, t *CachedTorrent) {
c.torrentNamesMutex.Lock()
defer c.torrentNamesMutex.Unlock()
c.torrentsNames[name] = t
}
func (c *Cache) GetTorrents() map[string]*CachedTorrent {
c.torrentsMutex.RLock()
defer c.torrentsMutex.RUnlock()
return c.torrents
}
func (c *Cache) GetTorrentNames() map[string]*CachedTorrent {
c.torrentNamesMutex.RLock()
defer c.torrentNamesMutex.RUnlock()
return c.torrentsNames
}
type Manager struct {
@@ -73,10 +103,11 @@ func (m *Manager) GetCache(debridName string) *Cache {
}
func New(debridService engine.Service, basePath string) *Cache {
dbPath := filepath.Join(basePath, "cache", debridService.GetName(), "db")
return &Cache{
dir: filepath.Join(basePath, "cache", debridService.GetName(), "torrents"),
torrents: &sync.Map{},
torrentsNames: &sync.Map{},
dir: dbPath,
torrents: make(map[string]*CachedTorrent),
torrentsNames: make(map[string]*CachedTorrent),
client: debridService,
}
}
@@ -84,93 +115,117 @@ func New(debridService engine.Service, basePath string) *Cache {
func (c *Cache) Start() error {
_logger := getLogger()
_logger.Info().Msg("Starting cache for: " + c.client.GetName())
// Make sure the directory exists
if err := os.MkdirAll(c.dir, 0755); err != nil {
return fmt.Errorf("failed to create cache directory: %w", err)
}
// Open BadgerDB
opts := badger.DefaultOptions(c.dir)
opts.Logger = nil // Disable Badger's internal logger
var err error
c.db, err = badger.Open(opts)
if err != nil {
return fmt.Errorf("failed to open BadgerDB: %w", err)
}
if err := c.Load(); err != nil {
return fmt.Errorf("failed to load cache: %v", err)
}
if err := c.Sync(); err != nil {
return fmt.Errorf("failed to sync cache: %v", err)
}
return nil
}
func (c *Cache) Close() error {
if c.db != nil {
return c.db.Close()
}
return nil
}
func (c *Cache) Load() error {
_logger := getLogger()
if err := os.MkdirAll(c.dir, 0755); err != nil {
return fmt.Errorf("failed to create cache directory: %w", err)
}
err := c.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()
files, err := os.ReadDir(c.dir)
if err != nil {
return fmt.Errorf("failed to read cache directory: %w", err)
}
prefix := []byte("torrent:")
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
for _, file := range files {
if file.IsDir() || filepath.Ext(file.Name()) != ".json" {
continue
err := item.Value(func(val []byte) error {
var ct CachedTorrent
if err := json.Unmarshal(val, &ct); err != nil {
_logger.Debug().Err(err).Msgf("Failed to unmarshal torrent")
return nil // Continue to next item
}
if len(ct.Files) > 0 {
c.SetTorrent(&ct)
c.SetTorrentName(ct.Name, &ct)
}
return nil
})
if err != nil {
_logger.Debug().Err(err).Msg("Error reading torrent value")
}
}
return nil
})
filePath := filepath.Join(c.dir, file.Name())
data, err := os.ReadFile(filePath)
if err != nil {
_logger.Debug().Err(err).Msgf("Failed to read file: %s", filePath)
continue
}
var ct CachedTorrent
if err := json.Unmarshal(data, &ct); err != nil {
_logger.Debug().Err(err).Msgf("Failed to unmarshal file: %s", filePath)
continue
}
if len(ct.Files) > 0 {
c.torrents.Store(ct.Torrent.Id, &ct)
c.torrentsNames.Store(ct.Torrent.Name, ct.Torrent.Id)
}
}
return nil
return err
}
func (c *Cache) GetTorrent(id string) *CachedTorrent {
if value, ok := c.torrents.Load(id); ok {
return value.(*CachedTorrent)
if t, ok := c.GetTorrents()[id]; ok {
return t
}
return nil
}
func (c *Cache) GetTorrentByName(name string) *CachedTorrent {
if id, ok := c.torrentsNames.Load(name); ok {
return c.GetTorrent(id.(string))
if t, ok := c.GetTorrentNames()[name]; ok {
return t
}
return nil
}
func (c *Cache) SaveTorrent(ct *CachedTorrent) error {
data, err := json.MarshalIndent(ct, "", " ")
data, err := json.Marshal(ct)
if err != nil {
return fmt.Errorf("failed to marshal torrent: %w", err)
}
fileName := ct.Torrent.Id + ".json"
filePath := filepath.Join(c.dir, fileName)
tmpFile := filePath + ".tmp"
key := []byte(fmt.Sprintf("torrent:%s", ct.Torrent.Id))
err = c.db.Update(func(txn *badger.Txn) error {
return txn.Set(key, data)
})
f, err := os.Create(tmpFile)
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
defer f.Close()
w := bufio.NewWriter(f)
if _, err := w.Write(data); err != nil {
return fmt.Errorf("failed to write data: %w", err)
return fmt.Errorf("failed to save torrent to BadgerDB: %w", err)
}
if err := w.Flush(); err != nil {
return fmt.Errorf("failed to flush data: %w", err)
// Also create an index by name for quick lookups
nameKey := []byte(fmt.Sprintf("name:%s", ct.Torrent.Name))
err = c.db.Update(func(txn *badger.Txn) error {
return txn.Set(nameKey, []byte(ct.Torrent.Id))
})
if err != nil {
return fmt.Errorf("failed to save torrent name index: %w", err)
}
return os.Rename(tmpFile, filePath)
return nil
}
func (c *Cache) SaveAll() error {
@@ -192,14 +247,23 @@ func (c *Cache) SaveAll() error {
}()
}
c.torrents.Range(func(_, value interface{}) bool {
tasks <- value.(*CachedTorrent)
return true
})
for _, value := range c.GetTorrents() {
tasks <- value
}
close(tasks)
wg.Wait()
c.LastUpdated = time.Now()
// Run value log garbage collection when appropriate
// This helps reclaim space from deleted/updated values
go func() {
err := c.db.RunValueLogGC(0.5) // Run GC if 50% of the value log can be discarded
if err != nil && err != badger.ErrNoRewrite {
_logger.Debug().Err(err).Msg("BadgerDB value log GC")
}
}()
return nil
}
@@ -209,44 +273,76 @@ func (c *Cache) Sync() error {
if err != nil {
return fmt.Errorf("failed to sync torrents: %v", err)
}
_logger.Info().Msgf("Syncing %d torrents", len(torrents))
workers := runtime.NumCPU() * 200
workChan := make(chan *torrent.Torrent, len(torrents))
errChan := make(chan error, len(torrents))
// Calculate optimal workers - balance between CPU and IO
workers := runtime.NumCPU() * 4 // A more balanced multiplier for BadgerDB
// Create channels with appropriate buffering
workChan := make(chan *torrent.Torrent, workers*2)
// Use an atomic counter for progress tracking
var processed int64
var errorCount int64
// Create a context with cancellation in case of critical errors
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a wait group for workers
var wg sync.WaitGroup
// Start workers
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for t := range workChan {
if err := c.processTorrent(t); err != nil {
errChan <- err
for {
select {
case t, ok := <-workChan:
if !ok {
return // Channel closed, exit goroutine
}
if err := c.processTorrent(t); err != nil {
_logger.Error().Err(err).Str("torrent", t.Name).Msg("sync error")
atomic.AddInt64(&errorCount, 1)
}
count := atomic.AddInt64(&processed, 1)
if count%1000 == 0 {
_logger.Info().Msgf("Progress: %d/%d torrents processed", count, len(torrents))
}
case <-ctx.Done():
return // Context cancelled, exit goroutine
}
}
}()
}
// Feed work to workers
for _, t := range torrents {
workChan <- t
select {
case workChan <- t:
// Work sent successfully
case <-ctx.Done():
break // Context cancelled
}
}
// Signal workers that no more work is coming
close(workChan)
// Wait for all workers to complete
wg.Wait()
close(errChan)
for err := range errChan {
_logger.Error().Err(err).Msg("sync error")
}
_logger.Info().Msgf("Synced %d torrents", len(torrents))
_logger.Info().Msgf("Sync complete: %d torrents processed, %d errors", len(torrents), errorCount)
return nil
}
func (c *Cache) processTorrent(t *torrent.Torrent) error {
if existing, ok := c.torrents.Load(t.Id); ok {
ct := existing.(*CachedTorrent)
if ct := c.GetTorrent(t.Id); ct != nil {
if ct.IsComplete {
return nil
}
@@ -259,7 +355,7 @@ func (c *Cache) AddTorrent(t *torrent.Torrent) {
_logger := getLogger()
if len(t.Files) == 0 {
tNew, err := c.client.GetTorrent(t.Id)
tNew, err := c.client.GetTorrent(t)
_logger.Debug().Msgf("Getting torrent files for %s", t.Id)
if err != nil {
_logger.Debug().Msgf("Failed to get torrent files for %s: %v", t.Id, err)
@@ -280,8 +376,8 @@ func (c *Cache) AddTorrent(t *torrent.Torrent) {
DownloadLinks: make(map[string]DownloadLinkCache),
}
c.torrents.Store(t.Id, ct)
c.torrentsNames.Store(t.Name, t.Id)
c.SetTorrent(ct)
c.SetTorrentName(t.Name, ct)
go func() {
if err := c.SaveTorrent(ct); err != nil {
@@ -290,12 +386,12 @@ func (c *Cache) AddTorrent(t *torrent.Torrent) {
}()
}
func (c *Cache) RefreshTorrent(torrentId string) *CachedTorrent {
func (c *Cache) RefreshTorrent(torrent *CachedTorrent) *CachedTorrent {
_logger := getLogger()
t, err := c.client.GetTorrent(torrentId)
t, err := c.client.GetTorrent(torrent.Torrent)
if err != nil {
_logger.Debug().Msgf("Failed to get torrent files for %s: %v", torrentId, err)
_logger.Debug().Msgf("Failed to get torrent files for %s: %v", torrent.Id, err)
return nil
}
if len(t.Files) == 0 {
@@ -309,8 +405,8 @@ func (c *Cache) RefreshTorrent(torrentId string) *CachedTorrent {
DownloadLinks: make(map[string]DownloadLinkCache),
}
c.torrents.Store(t.Id, ct)
c.torrentsNames.Store(t.Name, t.Id)
c.SetTorrent(ct)
c.SetTorrentName(t.Name, ct)
go func() {
if err := c.SaveTorrent(ct); err != nil {
@@ -329,7 +425,7 @@ func (c *Cache) GetFileDownloadLink(t *CachedTorrent, file *torrent.File) (strin
}
if file.Link == "" {
t = c.RefreshTorrent(t.Id)
t = c.RefreshTorrent(t)
if t == nil {
return "", fmt.Errorf("torrent not found")
}
@@ -354,7 +450,3 @@ func (c *Cache) GetFileDownloadLink(t *CachedTorrent, file *torrent.File) (strin
return link.DownloadLink, nil
}
func (c *Cache) GetTorrents() *sync.Map {
return c.torrents
}