Changelog 0.2.0 (#1)

* Changelog 0.2.0
This commit is contained in:
Mukhtar Akere
2024-09-12 06:01:10 +01:00
committed by GitHub
parent 60c6cb32d3
commit 9511f3e99e
25 changed files with 1494 additions and 274 deletions

View File

@@ -1,186 +0,0 @@
package cmd
import (
"fmt"
"github.com/fsnotify/fsnotify"
"goBlack/common"
"goBlack/debrid"
"log"
"os"
"path/filepath"
"sync"
"time"
)
type Blackhole struct {
config *common.Config
deb debrid.Service
cache *common.Cache
}
func NewBlackhole(config *common.Config, deb debrid.Service, cache *common.Cache) *Blackhole {
return &Blackhole{
config: config,
deb: deb,
cache: cache,
}
}
func fileReady(path string) bool {
_, err := os.Stat(path)
return !os.IsNotExist(err) // Returns true if the file exists
}
func checkFileLoop(wg *sync.WaitGroup, dir string, file debrid.TorrentFile, ready chan<- debrid.TorrentFile) {
defer wg.Done()
ticker := time.NewTicker(1 * time.Second) // Check every second
defer ticker.Stop()
path := filepath.Join(dir, file.Path)
for {
select {
case <-ticker.C:
if fileReady(path) {
ready <- file
return
}
}
}
}
func (b *Blackhole) processFiles(arr *debrid.Arr, torrent *debrid.Torrent) {
var wg sync.WaitGroup
files := torrent.Files
ready := make(chan debrid.TorrentFile, len(files))
log.Printf("Checking %d files...", len(files))
for _, file := range files {
wg.Add(1)
go checkFileLoop(&wg, arr.Debrid.Folder, file, ready)
}
go func() {
wg.Wait()
close(ready)
}()
for r := range ready {
log.Println("File is ready:", r.Name)
b.createSymLink(arr, torrent)
}
go torrent.Cleanup(true)
fmt.Printf("%s downloaded", torrent.Name)
}
func (b *Blackhole) createSymLink(arr *debrid.Arr, torrent *debrid.Torrent) {
path := filepath.Join(arr.CompletedFolder, torrent.Folder)
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
log.Printf("Failed to create directory: %s\n", path)
}
for _, file := range torrent.Files {
// Combine the directory and filename to form a full path
fullPath := filepath.Join(path, file.Name) // completedFolder/MyTVShow/MyTVShow.S01E01.720p.mkv
// Create a symbolic link if file doesn't exist
torrentPath := filepath.Join(arr.Debrid.Folder, torrent.Folder, file.Name) // debridFolder/MyTVShow/MyTVShow.S01E01.720p.mkv
_ = os.Symlink(torrentPath, fullPath)
}
}
func watcher(watcher *fsnotify.Watcher, events map[string]time.Time) {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op&fsnotify.Write == fsnotify.Write {
if filepath.Ext(event.Name) == ".torrent" || filepath.Ext(event.Name) == ".magnet" {
events[event.Name] = time.Now()
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Println("ERROR:", err)
}
}
}
func (b *Blackhole) processFilesDebounced(arr *debrid.Arr, events map[string]time.Time, debouncePeriod time.Duration) {
ticker := time.NewTicker(1 * time.Second) // Check every second
defer ticker.Stop()
for range ticker.C {
for file, lastEventTime := range events {
if time.Since(lastEventTime) >= debouncePeriod {
log.Printf("Torrent file detected: %s", file)
// Process the torrent file
torrent, err := b.deb.Process(arr, file)
if err != nil && torrent != nil {
// remove torrent file
torrent.Cleanup(true)
_ = torrent.MarkAsFailed()
log.Printf("Error processing torrent file: %s", err)
}
if err == nil && torrent != nil && len(torrent.Files) > 0 {
go b.processFiles(arr, torrent)
}
delete(events, file) // remove file from channel
}
}
}
}
func (b *Blackhole) startArr(arr *debrid.Arr) {
log.Printf("Watching: %s", arr.WatchFolder)
w, err := fsnotify.NewWatcher()
if err != nil {
log.Println(err)
}
defer func(w *fsnotify.Watcher) {
err := w.Close()
if err != nil {
log.Println(err)
}
}(w)
events := make(map[string]time.Time)
go watcher(w, events)
if err = w.Add(arr.WatchFolder); err != nil {
log.Println("Error Watching folder:", err)
return
}
b.processFilesDebounced(arr, events, 1*time.Second)
}
func (b *Blackhole) Start() {
log.Println("[*] Starting Blackhole")
var wg sync.WaitGroup
for _, conf := range b.config.Arrs {
wg.Add(1)
defer wg.Done()
headers := map[string]string{
"X-Api-Key": conf.Token,
}
client := common.NewRLHTTPClient(nil, headers)
arr := &debrid.Arr{
Debrid: b.config.Debrid,
WatchFolder: conf.WatchFolder,
CompletedFolder: conf.CompletedFolder,
Token: conf.Token,
URL: conf.URL,
Client: client,
}
go b.startArr(arr)
}
wg.Wait()
}

View File

@@ -3,33 +3,34 @@ package cmd
import (
"cmp"
"goBlack/common"
"goBlack/debrid"
"goBlack/pkg/debrid"
"goBlack/pkg/proxy"
"goBlack/pkg/qbit"
"sync"
)
func Start(config *common.Config) {
maxCacheSize := cmp.Or(config.MaxCacheSize, 1000)
cache := common.NewCache(maxCacheSize)
deb := debrid.NewDebrid(config.Debrid, cache)
var wg sync.WaitGroup
if config.Proxy.Enabled {
proxy := NewProxy(*config, deb, cache)
p := proxy.NewProxy(*config, deb, cache)
wg.Add(1)
go func() {
defer wg.Done()
proxy.Start()
p.Start()
}()
}
if len(config.Arrs) > 0 {
blackhole := NewBlackhole(config, deb, cache)
if config.QBitTorrent.Port != "" {
qb := qbit.NewQBit(config, deb, cache)
wg.Add(1)
go func() {
defer wg.Done()
blackhole.Start()
qb.Start()
}()
}

View File

@@ -1,321 +0,0 @@
package cmd
import (
"bytes"
"cmp"
"encoding/xml"
"fmt"
"github.com/elazarl/goproxy"
"github.com/elazarl/goproxy/ext/auth"
"github.com/valyala/fastjson"
"goBlack/common"
"goBlack/debrid"
"io"
"log"
"net/http"
"os"
"regexp"
"strings"
"sync"
)
type RSS struct {
XMLName xml.Name `xml:"rss"`
Text string `xml:",chardata"`
Version string `xml:"version,attr"`
Atom string `xml:"atom,attr"`
Torznab string `xml:"torznab,attr"`
Channel struct {
Text string `xml:",chardata"`
Link struct {
Text string `xml:",chardata"`
Rel string `xml:"rel,attr"`
Type string `xml:"type,attr"`
} `xml:"link"`
Title string `xml:"title"`
Items []Item `xml:"item"`
} `xml:"channel"`
}
type Item struct {
Text string `xml:",chardata"`
Title string `xml:"title"`
Description string `xml:"description"`
GUID string `xml:"guid"`
ProwlarrIndexer struct {
Text string `xml:",chardata"`
ID string `xml:"id,attr"`
Type string `xml:"type,attr"`
} `xml:"prowlarrindexer"`
Comments string `xml:"comments"`
PubDate string `xml:"pubDate"`
Size string `xml:"size"`
Link string `xml:"link"`
Category []string `xml:"category"`
Enclosure struct {
Text string `xml:",chardata"`
URL string `xml:"url,attr"`
Length string `xml:"length,attr"`
Type string `xml:"type,attr"`
} `xml:"enclosure"`
TorznabAttrs []struct {
Text string `xml:",chardata"`
Name string `xml:"name,attr"`
Value string `xml:"value,attr"`
} `xml:"attr"`
}
type Proxy struct {
port string
enabled bool
debug bool
username string
password string
cachedOnly bool
debrid debrid.Service
cache *common.Cache
}
func NewProxy(config common.Config, deb debrid.Service, cache *common.Cache) *Proxy {
cfg := config.Proxy
port := cmp.Or(os.Getenv("PORT"), cfg.Port, "8181")
return &Proxy{
port: port,
enabled: cfg.Enabled,
debug: cfg.Debug,
username: cfg.Username,
password: cfg.Password,
cachedOnly: cfg.CachedOnly,
debrid: deb,
cache: cache,
}
}
func (p *Proxy) ProcessJSONResponse(resp *http.Response) *http.Response {
if resp == nil || resp.Body == nil {
return resp
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return resp
}
err = resp.Body.Close()
if err != nil {
return nil
}
var par fastjson.Parser
v, err := par.ParseBytes(body)
if err != nil {
// If it's not JSON, return the original response
resp.Body = io.NopCloser(bytes.NewReader(body))
return resp
}
// Modify the JSON
// Serialize the modified JSON back to bytes
modifiedBody := v.MarshalTo(nil)
// Set the modified body back to the response
resp.Body = io.NopCloser(bytes.NewReader(modifiedBody))
resp.ContentLength = int64(len(modifiedBody))
resp.Header.Set("Content-Length", string(rune(len(modifiedBody))))
return resp
}
func (p *Proxy) ProcessResponse(resp *http.Response) *http.Response {
if resp == nil || resp.Body == nil {
return resp
}
contentType := resp.Header.Get("Content-Type")
switch contentType {
case "application/json":
return resp // p.ProcessJSONResponse(resp)
case "application/xml":
return p.ProcessXMLResponse(resp)
case "application/rss+xml":
return p.ProcessXMLResponse(resp)
default:
return resp
}
}
func getItemsHash(items []Item) map[string]string {
var wg sync.WaitGroup
idHashMap := sync.Map{} // Use sync.Map for concurrent access
for _, item := range items {
wg.Add(1)
go func(item Item) {
defer wg.Done()
hash := strings.ToLower(item.getHash())
if hash != "" {
idHashMap.Store(item.GUID, hash) // Store directly into sync.Map
}
}(item)
}
wg.Wait()
// Convert sync.Map to regular map
finalMap := make(map[string]string)
idHashMap.Range(func(key, value interface{}) bool {
finalMap[key.(string)] = value.(string)
return true
})
return finalMap
}
func (item Item) getHash() string {
infohash := ""
for _, attr := range item.TorznabAttrs {
if attr.Name == "infohash" {
return attr.Value
}
}
if strings.Contains(item.GUID, "magnet:?") {
magnet, err := common.GetMagnetInfo(item.GUID)
if err == nil && magnet != nil && magnet.InfoHash != "" {
return magnet.InfoHash
}
}
magnetLink := item.Link
if magnetLink == "" {
// We can't check the availability of the torrent without a magnet link or infohash
return ""
}
if strings.Contains(magnetLink, "magnet:?") {
magnet, err := common.GetMagnetInfo(magnetLink)
if err == nil && magnet != nil && magnet.InfoHash != "" {
return magnet.InfoHash
}
}
//Check Description for infohash
hash := common.ExtractInfoHash(item.Description)
if hash == "" {
// Check Title for infohash
hash = common.ExtractInfoHash(item.Comments)
}
infohash = hash
if infohash == "" {
//Get torrent file from http link
//Takes too long, not worth it
//magnet, err := common.OpenMagnetHttpURL(magnetLink)
//if err == nil && magnet != nil && magnet.InfoHash != "" {
// log.Printf("Magnet: %s", magnet.InfoHash)
//}
}
return infohash
}
func (p *Proxy) ProcessXMLResponse(resp *http.Response) *http.Response {
if resp == nil || resp.Body == nil {
return resp
}
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Println("Error reading response body:", err)
resp.Body = io.NopCloser(bytes.NewReader(body))
return resp
}
err = resp.Body.Close()
if err != nil {
return nil
}
var rss RSS
err = xml.Unmarshal(body, &rss)
if err != nil {
log.Printf("Error unmarshalling XML: %v", err)
resp.Body = io.NopCloser(bytes.NewReader(body))
return resp
}
indexer := ""
if len(rss.Channel.Items) > 0 {
indexer = rss.Channel.Items[0].ProwlarrIndexer.Text
}
// Step 4: Extract infohash or magnet URI, manipulate data
IdsHashMap := getItemsHash(rss.Channel.Items)
hashes := make([]string, 0)
for _, hash := range IdsHashMap {
if hash != "" {
hashes = append(hashes, hash)
}
}
availableHashesMap := p.debrid.IsAvailable(hashes)
newItems := make([]Item, 0, len(rss.Channel.Items))
if len(hashes) > 0 {
for _, item := range rss.Channel.Items {
hash := IdsHashMap[item.GUID]
if hash == "" {
continue
}
isCached, exists := availableHashesMap[hash]
if !exists || !isCached {
continue
}
newItems = append(newItems, item)
}
}
log.Printf("[%s Report]: %d/%d items are cached || Found %d infohash", indexer, len(newItems), len(rss.Channel.Items), len(hashes))
rss.Channel.Items = newItems
// rss.Channel.Items = newItems
modifiedBody, err := xml.MarshalIndent(rss, "", " ")
if err != nil {
log.Printf("Error marshalling XML: %v", err)
resp.Body = io.NopCloser(bytes.NewReader(body))
return resp
}
modifiedBody = append([]byte(xml.Header), modifiedBody...)
// Set the modified body back to the response
resp.Body = io.NopCloser(bytes.NewReader(modifiedBody))
return resp
}
func UrlMatches(re *regexp.Regexp) goproxy.ReqConditionFunc {
return func(req *http.Request, ctx *goproxy.ProxyCtx) bool {
return re.MatchString(req.URL.String())
}
}
func (p *Proxy) Start() {
username, password := p.username, p.password
proxy := goproxy.NewProxyHttpServer()
if username != "" || password != "" {
// Set up basic auth for proxy
auth.ProxyBasic(proxy, "my_realm", func(user, pwd string) bool {
return user == username && password == pwd
})
}
proxy.OnRequest(goproxy.ReqHostMatches(regexp.MustCompile("^.443$"))).HandleConnect(goproxy.AlwaysMitm)
proxy.OnResponse(
UrlMatches(regexp.MustCompile("^.*/api\\?t=(search|tvsearch|movie)(&.*)?$")),
goproxy.StatusCodeIs(http.StatusOK, http.StatusAccepted)).DoFunc(
func(resp *http.Response, ctx *goproxy.ProxyCtx) *http.Response {
return p.ProcessResponse(resp)
})
proxy.Verbose = p.debug
portFmt := fmt.Sprintf(":%s", p.port)
log.Printf("[*] Starting proxy server on %s\n", portFmt)
log.Fatal(http.ListenAndServe(fmt.Sprintf("%s", portFmt), proxy))
}