Changelog: 0.1.3
This commit is contained in:
+34
-19
@@ -12,6 +12,21 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type Blackhole struct {
|
||||
config *common.Config
|
||||
deb debrid.Service
|
||||
cache *common.Cache
|
||||
}
|
||||
|
||||
func NewBlackhole(config *common.Config, deb debrid.Service, cache *common.Cache) *Blackhole {
|
||||
return &Blackhole{
|
||||
config: config,
|
||||
deb: deb,
|
||||
cache: cache,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func fileReady(path string) bool {
|
||||
_, err := os.Stat(path)
|
||||
return !os.IsNotExist(err) // Returns true if the file exists
|
||||
@@ -33,7 +48,7 @@ func checkFileLoop(wg *sync.WaitGroup, dir string, file debrid.TorrentFile, read
|
||||
}
|
||||
}
|
||||
|
||||
func ProcessFiles(arr *debrid.Arr, torrent *debrid.Torrent) {
|
||||
func (b *Blackhole) processFiles(arr *debrid.Arr, torrent *debrid.Torrent) {
|
||||
var wg sync.WaitGroup
|
||||
files := torrent.Files
|
||||
ready := make(chan debrid.TorrentFile, len(files))
|
||||
@@ -52,29 +67,29 @@ func ProcessFiles(arr *debrid.Arr, torrent *debrid.Torrent) {
|
||||
|
||||
for r := range ready {
|
||||
log.Println("File is ready:", r.Name)
|
||||
CreateSymLink(arr, torrent)
|
||||
b.createSymLink(arr, torrent)
|
||||
|
||||
}
|
||||
go torrent.Cleanup(true)
|
||||
fmt.Printf("%s downloaded", torrent.Name)
|
||||
}
|
||||
|
||||
func CreateSymLink(config *debrid.Arr, torrent *debrid.Torrent) {
|
||||
path := filepath.Join(config.CompletedFolder, torrent.Folder)
|
||||
func (b *Blackhole) createSymLink(arr *debrid.Arr, torrent *debrid.Torrent) {
|
||||
path := filepath.Join(arr.CompletedFolder, torrent.Folder)
|
||||
err := os.MkdirAll(path, os.ModePerm)
|
||||
if err != nil {
|
||||
log.Printf("Failed to create directory: %s\n", path)
|
||||
}
|
||||
for _, file := range torrent.Files {
|
||||
// Combine the directory and filename to form a full path
|
||||
fullPath := filepath.Join(config.CompletedFolder, file.Path)
|
||||
fullPath := filepath.Join(arr.CompletedFolder, file.Path)
|
||||
|
||||
// Create a symbolic link if file doesn't exist
|
||||
_ = os.Symlink(filepath.Join(config.Debrid.Folder, file.Path), fullPath)
|
||||
_ = os.Symlink(filepath.Join(arr.Debrid.Folder, file.Path), fullPath)
|
||||
}
|
||||
}
|
||||
|
||||
func watchFiles(watcher *fsnotify.Watcher, events map[string]time.Time) {
|
||||
func watcher(watcher *fsnotify.Watcher, events map[string]time.Time) {
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-watcher.Events:
|
||||
@@ -96,7 +111,7 @@ func watchFiles(watcher *fsnotify.Watcher, events map[string]time.Time) {
|
||||
}
|
||||
}
|
||||
|
||||
func processFilesDebounced(arr *debrid.Arr, db debrid.Service, events map[string]time.Time, debouncePeriod time.Duration) {
|
||||
func (b *Blackhole) processFilesDebounced(arr *debrid.Arr, events map[string]time.Time, debouncePeriod time.Duration) {
|
||||
ticker := time.NewTicker(1 * time.Second) // Check every second
|
||||
defer ticker.Stop()
|
||||
|
||||
@@ -105,7 +120,7 @@ func processFilesDebounced(arr *debrid.Arr, db debrid.Service, events map[string
|
||||
if time.Since(lastEventTime) >= debouncePeriod {
|
||||
log.Printf("Torrent file detected: %s", file)
|
||||
// Process the torrent file
|
||||
torrent, err := db.Process(arr, file)
|
||||
torrent, err := b.deb.Process(arr, file)
|
||||
if err != nil && torrent != nil {
|
||||
// remove torrent file
|
||||
torrent.Cleanup(true)
|
||||
@@ -113,7 +128,7 @@ func processFilesDebounced(arr *debrid.Arr, db debrid.Service, events map[string
|
||||
log.Printf("Error processing torrent file: %s", err)
|
||||
}
|
||||
if err == nil && torrent != nil && len(torrent.Files) > 0 {
|
||||
go ProcessFiles(arr, torrent)
|
||||
go b.processFiles(arr, torrent)
|
||||
}
|
||||
delete(events, file) // remove file from channel
|
||||
|
||||
@@ -122,8 +137,8 @@ func processFilesDebounced(arr *debrid.Arr, db debrid.Service, events map[string
|
||||
}
|
||||
}
|
||||
|
||||
func StartArr(conf *debrid.Arr, db debrid.Service) {
|
||||
log.Printf("Watching: %s", conf.WatchFolder)
|
||||
func (b *Blackhole) startArr(arr *debrid.Arr) {
|
||||
log.Printf("Watching: %s", arr.WatchFolder)
|
||||
w, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
@@ -136,19 +151,19 @@ func StartArr(conf *debrid.Arr, db debrid.Service) {
|
||||
}(w)
|
||||
events := make(map[string]time.Time)
|
||||
|
||||
go watchFiles(w, events)
|
||||
if err = w.Add(conf.WatchFolder); err != nil {
|
||||
go watcher(w, events)
|
||||
if err = w.Add(arr.WatchFolder); err != nil {
|
||||
log.Println("Error Watching folder:", err)
|
||||
return
|
||||
}
|
||||
|
||||
processFilesDebounced(conf, db, events, 1*time.Second)
|
||||
b.processFilesDebounced(arr, events, 1*time.Second)
|
||||
}
|
||||
|
||||
func StartBlackhole(config *common.Config, deb debrid.Service) {
|
||||
func (b *Blackhole) Start() {
|
||||
log.Println("[*] Starting Blackhole")
|
||||
var wg sync.WaitGroup
|
||||
for _, conf := range config.Arrs {
|
||||
for _, conf := range b.config.Arrs {
|
||||
wg.Add(1)
|
||||
defer wg.Done()
|
||||
headers := map[string]string{
|
||||
@@ -157,14 +172,14 @@ func StartBlackhole(config *common.Config, deb debrid.Service) {
|
||||
client := common.NewRLHTTPClient(nil, headers)
|
||||
|
||||
arr := &debrid.Arr{
|
||||
Debrid: config.Debrid,
|
||||
Debrid: b.config.Debrid,
|
||||
WatchFolder: conf.WatchFolder,
|
||||
CompletedFolder: conf.CompletedFolder,
|
||||
Token: conf.Token,
|
||||
URL: conf.URL,
|
||||
Client: client,
|
||||
}
|
||||
go StartArr(arr, deb)
|
||||
go b.startArr(arr)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
+8
-4
@@ -1,19 +1,22 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"goBlack/common"
|
||||
"goBlack/debrid"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func Start(config *common.Config) {
|
||||
|
||||
deb := debrid.NewDebrid(config.Debrid)
|
||||
maxCacheSize := cmp.Or(config.MaxCacheSize, 1000)
|
||||
cache := common.NewCache(maxCacheSize)
|
||||
|
||||
deb := debrid.NewDebrid(config.Debrid, cache)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
if config.Proxy.Enabled {
|
||||
proxy := NewProxy(*config, deb)
|
||||
proxy := NewProxy(*config, deb, cache)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
@@ -22,10 +25,11 @@ func Start(config *common.Config) {
|
||||
}
|
||||
|
||||
if len(config.Arrs) > 0 {
|
||||
blackhole := NewBlackhole(config, deb, cache)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
StartBlackhole(config, deb)
|
||||
blackhole.Start()
|
||||
}()
|
||||
}
|
||||
|
||||
|
||||
+99
-75
@@ -16,6 +16,7 @@ import (
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type RSS struct {
|
||||
@@ -70,26 +71,28 @@ type TorznabAttr struct {
|
||||
}
|
||||
|
||||
type Proxy struct {
|
||||
Port string `json:"port"`
|
||||
Enabled bool `json:"enabled"`
|
||||
Debug bool `json:"debug"`
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
CachedOnly bool `json:"cached_only"`
|
||||
Debrid debrid.Service
|
||||
port string
|
||||
enabled bool
|
||||
debug bool
|
||||
username string
|
||||
password string
|
||||
cachedOnly bool
|
||||
debrid debrid.Service
|
||||
cache *common.Cache
|
||||
}
|
||||
|
||||
func NewProxy(config common.Config, deb debrid.Service) *Proxy {
|
||||
func NewProxy(config common.Config, deb debrid.Service, cache *common.Cache) *Proxy {
|
||||
cfg := config.Proxy
|
||||
port := cmp.Or(os.Getenv("PORT"), cfg.Port, "8181")
|
||||
return &Proxy{
|
||||
Port: port,
|
||||
Enabled: cfg.Enabled,
|
||||
Debug: cfg.Debug,
|
||||
Username: cfg.Username,
|
||||
Password: cfg.Password,
|
||||
CachedOnly: cfg.CachedOnly,
|
||||
Debrid: deb,
|
||||
port: port,
|
||||
enabled: cfg.Enabled,
|
||||
debug: cfg.Debug,
|
||||
username: cfg.Username,
|
||||
password: cfg.Password,
|
||||
cachedOnly: cfg.CachedOnly,
|
||||
debrid: deb,
|
||||
cache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,45 +150,76 @@ func (p *Proxy) ProcessResponse(resp *http.Response) *http.Response {
|
||||
}
|
||||
|
||||
func getItemsHash(items []Item) map[string]string {
|
||||
IdHashMap := make(map[string]string)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
idHashMap := sync.Map{} // Use sync.Map for concurrent access
|
||||
|
||||
for _, item := range items {
|
||||
hash := getItemHash(item)
|
||||
IdHashMap[item.GUID] = hash
|
||||
wg.Add(1)
|
||||
go func(item Item) {
|
||||
defer wg.Done()
|
||||
hash := strings.ToLower(getItemHash(item))
|
||||
if hash != "" {
|
||||
idHashMap.Store(item.GUID, hash) // Store directly into sync.Map
|
||||
}
|
||||
}(item)
|
||||
}
|
||||
return IdHashMap
|
||||
wg.Wait()
|
||||
|
||||
// Convert sync.Map to regular map
|
||||
finalMap := make(map[string]string)
|
||||
idHashMap.Range(func(key, value interface{}) bool {
|
||||
finalMap[key.(string)] = value.(string)
|
||||
return true
|
||||
})
|
||||
|
||||
return finalMap
|
||||
}
|
||||
|
||||
func getItemHash(item Item) string {
|
||||
magnetLink := ""
|
||||
infohash := ""
|
||||
|
||||
// Extract magnet link from the link or comments
|
||||
if strings.Contains(item.Link, "magnet:?") {
|
||||
magnetLink = item.Link
|
||||
} else if strings.Contains(item.GUID, "magnet:?") {
|
||||
magnetLink = item.GUID
|
||||
}
|
||||
|
||||
// Extract infohash from <torznab:attr> elements
|
||||
for _, attr := range item.TorznabAttrs {
|
||||
if attr.Name == "infohash" {
|
||||
infohash = attr.Value
|
||||
return attr.Value
|
||||
}
|
||||
}
|
||||
if magnetLink == "" && infohash == "" {
|
||||
|
||||
if strings.Contains(item.GUID, "magnet:?") {
|
||||
magnet, err := common.GetMagnetInfo(item.GUID)
|
||||
if err == nil && magnet != nil && magnet.InfoHash != "" {
|
||||
return magnet.InfoHash
|
||||
}
|
||||
}
|
||||
|
||||
magnetLink := item.Link
|
||||
|
||||
if magnetLink == "" {
|
||||
// We can't check the availability of the torrent without a magnet link or infohash
|
||||
return ""
|
||||
}
|
||||
var magnet *common.Magnet
|
||||
var err error
|
||||
|
||||
if infohash == "" {
|
||||
magnet, err = common.GetMagnetInfo(magnetLink)
|
||||
if err != nil || magnet == nil || magnet.InfoHash == "" {
|
||||
log.Println("Error getting magnet info:", err)
|
||||
return ""
|
||||
if strings.Contains(magnetLink, "magnet:?") {
|
||||
magnet, err := common.GetMagnetInfo(magnetLink)
|
||||
if err == nil && magnet != nil && magnet.InfoHash != "" {
|
||||
return magnet.InfoHash
|
||||
}
|
||||
infohash = magnet.InfoHash
|
||||
}
|
||||
|
||||
//Check Description for infohash
|
||||
hash := common.ExtractInfoHash(item.Description)
|
||||
if hash == "" {
|
||||
// Check Title for infohash
|
||||
hash = common.ExtractInfoHash(item.Comments)
|
||||
}
|
||||
infohash = hash
|
||||
if infohash == "" {
|
||||
//Get torrent file from http link
|
||||
//Takes too long, not worth it
|
||||
//magnet, err := common.OpenMagnetHttpURL(magnetLink)
|
||||
//if err == nil && magnet != nil && magnet.InfoHash != "" {
|
||||
// log.Printf("Magnet: %s", magnet.InfoHash)
|
||||
//}
|
||||
}
|
||||
return infohash
|
||||
|
||||
@@ -198,9 +232,7 @@ func (p *Proxy) ProcessXMLResponse(resp *http.Response) *http.Response {
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
if p.Debug {
|
||||
log.Println("Error reading response body:", err)
|
||||
}
|
||||
log.Println("Error reading response body:", err)
|
||||
return resp
|
||||
}
|
||||
err = resp.Body.Close()
|
||||
@@ -211,12 +243,9 @@ func (p *Proxy) ProcessXMLResponse(resp *http.Response) *http.Response {
|
||||
var rss RSS
|
||||
err = xml.Unmarshal(body, &rss)
|
||||
if err != nil {
|
||||
if p.Debug {
|
||||
log.Printf("Error unmarshalling XML: %v", err)
|
||||
}
|
||||
log.Printf("Error unmarshalling XML: %v", err)
|
||||
return resp
|
||||
}
|
||||
newItems := make([]Item, 0)
|
||||
|
||||
// Step 4: Extract infohash or magnet URI, manipulate data
|
||||
IdsHashMap := getItemsHash(rss.Channel.Items)
|
||||
@@ -226,45 +255,38 @@ func (p *Proxy) ProcessXMLResponse(resp *http.Response) *http.Response {
|
||||
hashes = append(hashes, hash)
|
||||
}
|
||||
}
|
||||
if len(hashes) == 0 {
|
||||
// No infohashes or magnet links found, should we return the original response?
|
||||
return resp
|
||||
}
|
||||
availableHashes := p.Debrid.IsAvailable(hashes)
|
||||
for _, item := range rss.Channel.Items {
|
||||
hash := IdsHashMap[item.GUID]
|
||||
if hash == "" {
|
||||
// newItems = append(newItems, item)
|
||||
continue
|
||||
}
|
||||
isCached, exists := availableHashes[hash]
|
||||
if !exists {
|
||||
// newItems = append(newItems, item)
|
||||
continue
|
||||
}
|
||||
if !isCached {
|
||||
continue
|
||||
}
|
||||
newItems = append(newItems, item)
|
||||
log.Printf("Found %d infohashes/magnet links", len(hashes))
|
||||
availableHashesMap := p.debrid.IsAvailable(hashes)
|
||||
newItems := make([]Item, 0, len(rss.Channel.Items))
|
||||
|
||||
if len(hashes) > 0 {
|
||||
for _, item := range rss.Channel.Items {
|
||||
hash := IdsHashMap[item.GUID]
|
||||
if hash == "" {
|
||||
continue
|
||||
}
|
||||
isCached, exists := availableHashesMap[hash]
|
||||
if !exists || !isCached {
|
||||
continue
|
||||
}
|
||||
newItems = append(newItems, item)
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("Report: %d/%d items are cached", len(newItems), len(rss.Channel.Items))
|
||||
rss.Channel.Items = newItems
|
||||
|
||||
// rss.Channel.Items = newItems
|
||||
modifiedBody, err := xml.MarshalIndent(rss, "", " ")
|
||||
if err != nil {
|
||||
if p.Debug {
|
||||
log.Printf("Error marshalling XML: %v", err)
|
||||
}
|
||||
log.Printf("Error marshalling XML: %v", err)
|
||||
resp.Body = io.NopCloser(bytes.NewReader(body))
|
||||
return resp
|
||||
}
|
||||
modifiedBody = append([]byte(xml.Header), modifiedBody...)
|
||||
|
||||
// Set the modified body back to the response
|
||||
resp.Body = io.NopCloser(bytes.NewReader(modifiedBody))
|
||||
resp.ContentLength = int64(len(modifiedBody))
|
||||
resp.Header.Set("Content-Length", string(rune(len(modifiedBody))))
|
||||
|
||||
return resp
|
||||
}
|
||||
|
||||
@@ -275,7 +297,7 @@ func UrlMatches(re *regexp.Regexp) goproxy.ReqConditionFunc {
|
||||
}
|
||||
|
||||
func (p *Proxy) Start() {
|
||||
username, password := p.Username, p.Password
|
||||
username, password := p.username, p.password
|
||||
proxy := goproxy.NewProxyHttpServer()
|
||||
if username != "" || password != "" {
|
||||
// Set up basic auth for proxy
|
||||
@@ -285,13 +307,15 @@ func (p *Proxy) Start() {
|
||||
}
|
||||
|
||||
proxy.OnRequest(goproxy.ReqHostMatches(regexp.MustCompile("^.443$"))).HandleConnect(goproxy.AlwaysMitm)
|
||||
proxy.OnResponse(UrlMatches(regexp.MustCompile("^.*/api\\?t=(search|tvsearch|movie)(&.*)?$"))).DoFunc(
|
||||
proxy.OnResponse(
|
||||
UrlMatches(regexp.MustCompile("^.*/api\\?t=(search|tvsearch|movie)(&.*)?$")),
|
||||
goproxy.StatusCodeIs(http.StatusOK, http.StatusAccepted)).DoFunc(
|
||||
func(resp *http.Response, ctx *goproxy.ProxyCtx) *http.Response {
|
||||
return p.ProcessResponse(resp)
|
||||
})
|
||||
|
||||
proxy.Verbose = p.Debug
|
||||
portFmt := fmt.Sprintf(":%s", p.Port)
|
||||
proxy.Verbose = p.debug
|
||||
portFmt := fmt.Sprintf(":%s", p.port)
|
||||
log.Printf("[*] Starting proxy server on %s\n", portFmt)
|
||||
log.Fatal(http.ListenAndServe(fmt.Sprintf("%s", portFmt), proxy))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user