First Release
This commit is contained in:
@@ -0,0 +1,169 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"goBlack/common"
|
||||
"goBlack/debrid"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
func fileReady(path string) bool {
|
||||
_, err := os.Stat(path)
|
||||
return !os.IsNotExist(err) // Returns true if the file exists
|
||||
}
|
||||
|
||||
func checkFileLoop(wg *sync.WaitGroup, dir string, file debrid.TorrentFile, ready chan<- debrid.TorrentFile) {
|
||||
defer wg.Done()
|
||||
ticker := time.NewTicker(1 * time.Second) // Check every second
|
||||
defer ticker.Stop()
|
||||
path := filepath.Join(dir, file.Path)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if fileReady(path) {
|
||||
ready <- file
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ProcessFiles(arr *debrid.Arr, torrent *debrid.Torrent) {
|
||||
var wg sync.WaitGroup
|
||||
files := torrent.Files
|
||||
ready := make(chan debrid.TorrentFile, len(files))
|
||||
|
||||
log.Println("Checking files...")
|
||||
|
||||
for _, file := range files {
|
||||
wg.Add(1)
|
||||
go checkFileLoop(&wg, arr.Debrid.Folder, file, ready)
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(ready)
|
||||
}()
|
||||
|
||||
for r := range ready {
|
||||
log.Println("File is ready:", r.Name)
|
||||
CreateSymLink(arr, torrent)
|
||||
|
||||
}
|
||||
go torrent.Cleanup(true)
|
||||
fmt.Printf("%s downloaded", torrent.Name)
|
||||
}
|
||||
|
||||
func CreateSymLink(config *debrid.Arr, torrent *debrid.Torrent) {
|
||||
path := filepath.Join(config.CompletedFolder, torrent.Folder)
|
||||
err := os.MkdirAll(path, os.ModePerm)
|
||||
if err != nil {
|
||||
log.Printf("Failed to create directory: %s\n", path)
|
||||
}
|
||||
for _, file := range torrent.Files {
|
||||
// Combine the directory and filename to form a full path
|
||||
fullPath := filepath.Join(config.CompletedFolder, file.Path)
|
||||
|
||||
// Create a symbolic link if file doesn't exist
|
||||
_ = os.Symlink(filepath.Join(config.Debrid.Folder, file.Path), fullPath)
|
||||
}
|
||||
}
|
||||
|
||||
func watchFiles(watcher *fsnotify.Watcher, events map[string]time.Time) {
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-watcher.Events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if event.Op&fsnotify.Write == fsnotify.Write {
|
||||
if filepath.Ext(event.Name) == ".torrent" || filepath.Ext(event.Name) == ".magnet" {
|
||||
events[event.Name] = time.Now()
|
||||
}
|
||||
|
||||
}
|
||||
case err, ok := <-watcher.Errors:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
log.Println("ERROR:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func processFilesDebounced(arr *debrid.Arr, db debrid.Service, events map[string]time.Time, debouncePeriod time.Duration) {
|
||||
ticker := time.NewTicker(1 * time.Second) // Check every second
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
for file, lastEventTime := range events {
|
||||
if time.Since(lastEventTime) >= debouncePeriod {
|
||||
log.Printf("Torrent file detected: %s", file)
|
||||
// Process the torrent file
|
||||
torrent, err := db.Process(arr, file)
|
||||
if err != nil && torrent != nil {
|
||||
// remove torrent file
|
||||
torrent.Cleanup(true)
|
||||
_ = torrent.MarkAsFailed()
|
||||
log.Printf("Error processing torrent file: %s", err)
|
||||
}
|
||||
if err == nil && torrent != nil && len(torrent.Files) > 0 {
|
||||
go ProcessFiles(arr, torrent)
|
||||
}
|
||||
delete(events, file) // remove file from channel
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func StartArr(conf *debrid.Arr, db debrid.Service) {
|
||||
log.Printf("Watching: %s", conf.WatchFolder)
|
||||
w, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer func(w *fsnotify.Watcher) {
|
||||
err := w.Close()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}(w)
|
||||
events := make(map[string]time.Time)
|
||||
|
||||
go watchFiles(w, events)
|
||||
if err = w.Add(conf.WatchFolder); err != nil {
|
||||
log.Println("Error Watching folder:", err)
|
||||
return
|
||||
}
|
||||
|
||||
processFilesDebounced(conf, db, events, 1*time.Second)
|
||||
}
|
||||
|
||||
func StartBlackhole(config *common.Config, deb debrid.Service) {
|
||||
var wg sync.WaitGroup
|
||||
for _, conf := range config.Arrs {
|
||||
wg.Add(1)
|
||||
defer wg.Done()
|
||||
headers := map[string]string{
|
||||
"X-Api-Key": conf.Token,
|
||||
}
|
||||
client := common.NewRLHTTPClient(nil, headers)
|
||||
|
||||
arr := &debrid.Arr{
|
||||
Debrid: config.Debrid,
|
||||
WatchFolder: conf.WatchFolder,
|
||||
CompletedFolder: conf.CompletedFolder,
|
||||
Token: conf.Token,
|
||||
URL: conf.URL,
|
||||
Client: client,
|
||||
}
|
||||
go StartArr(arr, deb)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
+18
@@ -0,0 +1,18 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"goBlack/common"
|
||||
"goBlack/debrid"
|
||||
"log"
|
||||
)
|
||||
|
||||
func Start(config *common.Config) {
|
||||
|
||||
log.Print("[*] BlackHole running")
|
||||
deb := debrid.NewDebrid(config.Debrid)
|
||||
if config.Proxy.Enabled {
|
||||
go StartProxy(config, deb)
|
||||
}
|
||||
StartBlackhole(config, deb)
|
||||
|
||||
}
|
||||
+274
@@ -0,0 +1,274 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"cmp"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"github.com/elazarl/goproxy"
|
||||
"github.com/elazarl/goproxy/ext/auth"
|
||||
"github.com/valyala/fastjson"
|
||||
"goBlack/common"
|
||||
"goBlack/debrid"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type RSS struct {
|
||||
XMLName xml.Name `xml:"rss"`
|
||||
Version string `xml:"version,attr"`
|
||||
Channel Channel `xml:"channel"`
|
||||
}
|
||||
|
||||
type Channel struct {
|
||||
XMLName xml.Name `xml:"channel"`
|
||||
Title string `xml:"title"`
|
||||
AtomLink AtomLink `xml:"link"`
|
||||
Items []Item `xml:"item"`
|
||||
}
|
||||
|
||||
type AtomLink struct {
|
||||
XMLName xml.Name `xml:"link"`
|
||||
Rel string `xml:"rel,attr"`
|
||||
Type string `xml:"type,attr"`
|
||||
}
|
||||
|
||||
type Item struct {
|
||||
XMLName xml.Name `xml:"item"`
|
||||
Title string `xml:"title"`
|
||||
Description string `xml:"description"`
|
||||
GUID string `xml:"guid"`
|
||||
ProwlarrIndexer ProwlarrIndexer `xml:"prowlarrindexer"`
|
||||
Comments string `xml:"comments"`
|
||||
PubDate string `xml:"pubDate"`
|
||||
Size int64 `xml:"size"`
|
||||
Link string `xml:"link"`
|
||||
Categories []string `xml:"category"`
|
||||
Enclosure Enclosure `xml:"enclosure"`
|
||||
TorznabAttrs []TorznabAttr `xml:"torznab:attr"`
|
||||
}
|
||||
|
||||
type ProwlarrIndexer struct {
|
||||
ID string `xml:"id,attr"`
|
||||
Type string `xml:"type,attr"`
|
||||
Value string `xml:",chardata"`
|
||||
}
|
||||
|
||||
type Enclosure struct {
|
||||
URL string `xml:"url,attr"`
|
||||
Length int64 `xml:"length,attr"`
|
||||
Type string `xml:"type,attr"`
|
||||
}
|
||||
|
||||
type TorznabAttr struct {
|
||||
Name string `xml:"name,attr"`
|
||||
Value string `xml:"value,attr"`
|
||||
}
|
||||
|
||||
type SafeItems struct {
|
||||
mu sync.Mutex
|
||||
Items []Item
|
||||
}
|
||||
|
||||
func (s *SafeItems) Add(item Item) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.Items = append(s.Items, item)
|
||||
}
|
||||
|
||||
func (s *SafeItems) Get() []Item {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.Items
|
||||
}
|
||||
|
||||
func ProcessJSONResponse(resp *http.Response, deb debrid.Service) *http.Response {
|
||||
if resp == nil || resp.Body == nil {
|
||||
return resp
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Println("Error reading response body:", err)
|
||||
return resp
|
||||
}
|
||||
err = resp.Body.Close()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var p fastjson.Parser
|
||||
v, err := p.ParseBytes(body)
|
||||
if err != nil {
|
||||
// If it's not JSON, return the original response
|
||||
resp.Body = io.NopCloser(bytes.NewReader(body))
|
||||
return resp
|
||||
}
|
||||
|
||||
// Modify the JSON
|
||||
|
||||
// Serialize the modified JSON back to bytes
|
||||
modifiedBody := v.MarshalTo(nil)
|
||||
|
||||
// Set the modified body back to the response
|
||||
resp.Body = io.NopCloser(bytes.NewReader(modifiedBody))
|
||||
resp.ContentLength = int64(len(modifiedBody))
|
||||
resp.Header.Set("Content-Length", string(rune(len(modifiedBody))))
|
||||
|
||||
return resp
|
||||
|
||||
}
|
||||
|
||||
func ProcessResponse(resp *http.Response, deb debrid.Service) *http.Response {
|
||||
if resp == nil || resp.Body == nil {
|
||||
return resp
|
||||
}
|
||||
contentType := resp.Header.Get("Content-Type")
|
||||
switch contentType {
|
||||
case "application/json":
|
||||
return ProcessJSONResponse(resp, deb)
|
||||
case "application/xml":
|
||||
return ProcessXMLResponse(resp, deb)
|
||||
case "application/rss+xml":
|
||||
return ProcessXMLResponse(resp, deb)
|
||||
default:
|
||||
return resp
|
||||
}
|
||||
}
|
||||
|
||||
func XMLItemIsCached(item Item, deb debrid.Service) bool {
|
||||
magnetLink := ""
|
||||
infohash := ""
|
||||
|
||||
// Extract magnet link from the link or comments
|
||||
if strings.Contains(item.Link, "magnet:?") {
|
||||
magnetLink = item.Link
|
||||
} else if strings.Contains(item.GUID, "magnet:?") {
|
||||
magnetLink = item.GUID
|
||||
}
|
||||
|
||||
// Extract infohash from <torznab:attr> elements
|
||||
for _, attr := range item.TorznabAttrs {
|
||||
if attr.Name == "infohash" {
|
||||
infohash = attr.Value
|
||||
}
|
||||
}
|
||||
if magnetLink == "" && infohash == "" {
|
||||
// We can't check the availability of the torrent without a magnet link or infohash
|
||||
return false
|
||||
}
|
||||
var magnet *common.Magnet
|
||||
var err error
|
||||
|
||||
if infohash == "" {
|
||||
magnet, err = common.GetMagnetInfo(magnetLink)
|
||||
if err != nil {
|
||||
log.Println("Error getting magnet info:", err)
|
||||
return false
|
||||
}
|
||||
} else {
|
||||
magnet = &common.Magnet{
|
||||
InfoHash: infohash,
|
||||
Name: item.Title,
|
||||
Link: magnetLink,
|
||||
}
|
||||
}
|
||||
if magnet == nil {
|
||||
log.Println("Error getting magnet info")
|
||||
return false
|
||||
}
|
||||
return deb.IsAvailable(magnet)
|
||||
|
||||
}
|
||||
|
||||
func ProcessXMLResponse(resp *http.Response, deb debrid.Service) *http.Response {
|
||||
if resp == nil || resp.Body == nil {
|
||||
return resp
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Println("Error reading response body:", err)
|
||||
return resp
|
||||
}
|
||||
err = resp.Body.Close()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var rss RSS
|
||||
err = xml.Unmarshal(body, &rss)
|
||||
if err != nil {
|
||||
log.Fatalf("Error unmarshalling XML: %v", err)
|
||||
return resp
|
||||
}
|
||||
newItems := &SafeItems{}
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Step 4: Extract infohash or magnet URI, manipulate data
|
||||
for _, item := range rss.Channel.Items {
|
||||
wg.Add(1)
|
||||
go func(item Item) {
|
||||
defer wg.Done()
|
||||
if XMLItemIsCached(item, deb) {
|
||||
newItems.Add(item)
|
||||
}
|
||||
}(item)
|
||||
}
|
||||
wg.Wait()
|
||||
rss.Channel.Items = newItems.Get()
|
||||
|
||||
// rss.Channel.Items = newItems
|
||||
modifiedBody, err := xml.MarshalIndent(rss, "", " ")
|
||||
if err != nil {
|
||||
log.Printf("Error marshalling XML: %v", err)
|
||||
return resp
|
||||
}
|
||||
modifiedBody = append([]byte(xml.Header), modifiedBody...)
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("Error marshalling XML: %v", err)
|
||||
return resp
|
||||
}
|
||||
|
||||
// Set the modified body back to the response
|
||||
resp.Body = io.NopCloser(bytes.NewReader(modifiedBody))
|
||||
resp.ContentLength = int64(len(modifiedBody))
|
||||
resp.Header.Set("Content-Length", string(rune(len(modifiedBody))))
|
||||
|
||||
return resp
|
||||
}
|
||||
|
||||
func UrlMatches(re *regexp.Regexp) goproxy.ReqConditionFunc {
|
||||
return func(req *http.Request, ctx *goproxy.ProxyCtx) bool {
|
||||
return re.MatchString(req.URL.String())
|
||||
}
|
||||
}
|
||||
|
||||
func StartProxy(config *common.Config, deb debrid.Service) {
|
||||
username, password := config.Proxy.Username, config.Proxy.Password
|
||||
cfg := config.Proxy
|
||||
proxy := goproxy.NewProxyHttpServer()
|
||||
if username != "" || password != "" {
|
||||
// Set up basic auth for proxy
|
||||
auth.ProxyBasic(proxy, "my_realm", func(user, pwd string) bool {
|
||||
return user == username && password == pwd
|
||||
})
|
||||
}
|
||||
|
||||
proxy.OnRequest(goproxy.ReqHostMatches(regexp.MustCompile("^.443$"))).HandleConnect(goproxy.AlwaysMitm)
|
||||
proxy.OnResponse(UrlMatches(regexp.MustCompile("^.*/api\\?t=(search|tvsearch|movie)(&.*)?$"))).DoFunc(
|
||||
func(resp *http.Response, ctx *goproxy.ProxyCtx) *http.Response {
|
||||
return ProcessResponse(resp, deb)
|
||||
})
|
||||
|
||||
port := cmp.Or(cfg.Port, "8181")
|
||||
proxy.Verbose = cfg.Debug
|
||||
port = fmt.Sprintf(":%s", port)
|
||||
log.Printf("Starting proxy server on %s\n", port)
|
||||
log.Fatal(http.ListenAndServe(fmt.Sprintf("%s", port), proxy))
|
||||
}
|
||||
Reference in New Issue
Block a user